You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2019/11/26 10:34:13 UTC
[incubator-hudi] branch master updated: [HUDI-340]: made max events
to read from kafka source configurable (#1039)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2a4cfb4 [HUDI-340]: made max events to read from kafka source configurable (#1039)
2a4cfb4 is described below
commit 2a4cfb47c76a0c8800d4998453ec356711807c83
Author: Pratyaksh Sharma <pr...@gmail.com>
AuthorDate: Tue Nov 26 16:04:02 2019 +0530
[HUDI-340]: made max events to read from kafka source configurable (#1039)
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 13 ++-
.../hudi/utilities/sources/TestKafkaSource.java | 93 ++++++++++++++++++++--
2 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 4211af6..873e793 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -50,8 +50,6 @@ public class KafkaOffsetGen {
private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);
- private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max
-
public static class CheckpointUtils {
/**
@@ -170,10 +168,13 @@ public class KafkaOffsetGen {
/**
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
*/
- static class Config {
+ public static class Config {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
+ private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
+ public static final long defaultMaxEventsFromKafkaSource = 5000000;
+ public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = defaultMaxEventsFromKafkaSource;
}
private final HashMap<String, String> kafkaParams;
@@ -229,7 +230,11 @@ public class KafkaOffsetGen {
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
- long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
+ long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
+ Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE);
+ maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
+ ? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : maxEventsToReadFromKafka;
+ long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
return offsetRanges;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index 241fae0..c1ca1f0 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -32,6 +32,7 @@ import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -78,18 +79,26 @@ public class TestKafkaSource extends UtilitiesTestBase {
testUtils.teardown();
}
- @Test
- public void testJsonKafkaSource() throws IOException {
-
- // topic setup.
- testUtils.createTopic(TEST_TOPIC_NAME, 2);
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
+ String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE));
+ return props;
+ }
+
+ @Test
+ public void testJsonKafkaSource() throws IOException {
+
+ // topic setup.
+ testUtils.createTopic(TEST_TOPIC_NAME, 2);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ TypedProperties props = createPropsForJsonSource(null);
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
@@ -131,6 +140,78 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}
+ @Test
+ public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
+ // topic setup.
+ testUtils.createTopic(TEST_TOPIC_NAME, 2);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE);
+
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+ Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500;
+
+ /*
+ 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
+ maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
+ */
+ testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(500, fetch1.getBatch().get().count());
+
+ // 2. Produce new data, extract new data based on sourceLimit
+ testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+ InputBatch<Dataset<Row>> fetch2 =
+ kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
+ assertEquals(1500, fetch2.getBatch().get().count());
+
+ //reset the value back since it is a static variable
+ Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = Config.defaultMaxEventsFromKafkaSource;
+ }
+
+ @Test
+ public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException {
+ // topic setup.
+ testUtils.createTopic(TEST_TOPIC_NAME, 2);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ TypedProperties props = createPropsForJsonSource(500L);
+
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+ // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
+ testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
+ assertEquals(900, fetch1.getBatch().get().count());
+
+ // 2. Produce new data, extract new data based on upper cap
+ testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+ InputBatch<Dataset<Row>> fetch2 =
+ kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
+ assertEquals(500, fetch2.getBatch().get().count());
+
+ //fetch data respecting source limit where upper cap > sourceLimit
+ InputBatch<JavaRDD<GenericRecord>> fetch3 =
+ kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), 400);
+ assertEquals(400, fetch3.getBatch().get().count());
+
+ //fetch data respecting source limit where upper cap < sourceLimit
+ InputBatch<JavaRDD<GenericRecord>> fetch4 =
+ kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), 600);
+ assertEquals(600, fetch4.getBatch().get().count());
+
+ // 3. Extract with previous checkpoint => gives same data back (idempotent)
+ InputBatch<JavaRDD<GenericRecord>> fetch5 =
+ kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
+ assertEquals(fetch2.getBatch().get().count(), fetch5.getBatch().get().count());
+ assertEquals(fetch2.getCheckpointForNextBatch(), fetch5.getCheckpointForNextBatch());
+
+ // 4. Extract with latest checkpoint => no new data returned
+ InputBatch<JavaRDD<GenericRecord>> fetch6 =
+ kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
+ assertEquals(Option.empty(), fetch6.getBatch());
+ }
+
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {