You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/02/25 12:08:27 UTC
[hudi] branch master updated: [HUDI-1367] Make deltaStreamer
transition from dfsSouce to kafkasouce (#2227)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 617cc24 [HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (#2227)
617cc24 is described below
commit 617cc24ad1a28196b872df5663e9e0f48cd7f0fa
Author: liujinhui <96...@qq.com>
AuthorDate: Thu Feb 25 20:08:13 2021 +0800
[HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (#2227)
Co-authored-by: Sivabalan Narayanan <si...@uber.com>
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 52 ++++++-
.../functional/TestHoodieDeltaStreamer.java | 151 +++++++++++++++++++--
.../TestHoodieMultiTableDeltaStreamer.java | 21 +--
.../hudi/utilities/sources/TestKafkaSource.java | 2 +-
.../utilities/testutils/UtilitiesTestBase.java | 17 ++-
5 files changed, 213 insertions(+), 30 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index fc7ba79..e37ec0a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -40,6 +41,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -49,6 +52,12 @@ public class KafkaOffsetGen {
private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
+ /**
+ * kafka checkpoint Pattern.
+ * Format: topic_name,partition_num:offset,partition_num:offset,....
+ */
+ private final Pattern pattern = Pattern.compile(".*,.*:.*");
+
public static class CheckpointUtils {
/**
@@ -148,7 +157,8 @@ public class KafkaOffsetGen {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
- private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
+ private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets";
+ private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
@@ -156,15 +166,29 @@ public class KafkaOffsetGen {
private final HashMap<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
+ private KafkaResetOffsetStrategies autoResetValue;
public KafkaOffsetGen(TypedProperties props) {
this.props = props;
+
kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
+ String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name());
+ boolean found = false;
+ for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
+ if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
+ found = true;
+ autoResetValue = entry;
+ break;
+ }
+ }
+ if (!found) {
+ throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr);
+ }
}
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
@@ -186,8 +210,6 @@ public class KafkaOffsetGen {
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
} else {
- KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
- .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
@@ -227,12 +249,23 @@ public class KafkaOffsetGen {
// else return earliest offsets
private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
- Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
+ if (checkTopicCheckpoint(lastCheckpointStr)) {
+ Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+ boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
+ .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
+ return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+ }
+
+ switch (autoResetValue) {
+ case EARLIEST:
+ return earliestOffsets;
+ case LATEST:
+ return consumer.endOffsets(topicPartitions);
+ default:
+ throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
+ }
- boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
- .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
- return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
@@ -257,6 +290,11 @@ public class KafkaOffsetGen {
return result.containsKey(topicName);
}
+ private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
+ Matcher matcher = pattern.matcher(lastCheckpointStr.get());
+ return matcher.matches();
+ }
+
public String getTopicName() {
return topicName;
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 616d039..7fb5b18 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -48,6 +48,7 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -56,10 +57,12 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+ private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static String PARQUET_SOURCE_ROOT;
+ private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
+ private static final int JSON_KAFKA_NUM_RECORDS = 5;
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
@@ -136,15 +142,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
+ protected static String topicName;
- private static int testNum = 1;
+ protected static int testNum = 1;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+ JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
+ topicName = "topic" + testNum;
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
@@ -236,7 +245,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
- props.setProperty("auto.offset.reset", "earliest");
+ props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
@@ -966,27 +975,56 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
- String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+ prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
+ }
+
+ private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema,
+ String schemaStr, Schema schema) throws IOException {
+ String path = PARQUET_SOURCE_ROOT + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInserts("000", numRecords)), new Path(path));
+ if (useCustomSchema) {
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
+ schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+ } else {
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInserts("000", numRecords)), new Path(path));
+ }
+ }
+
+ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException {
+ if (createTopic) {
+ try {
+ testUtils.createTopic(topicName, 2);
+ } catch (TopicExistsException e) {
+ // no op
+ }
+ }
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
+ PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
+ }
+
+ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+ String propsFileName, String parquetSourceRoot) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
+ parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
- parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
+ parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
- parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
+ parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile);
}
}
- parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
-
- UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
+ parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
+ UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
@@ -1001,6 +1039,99 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
testNum++;
}
+ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
+ // Properties used for testing delta-streamer with JsonKafka source
+ TypedProperties props = new TypedProperties();
+ populateCommonProps(props);
+ props.setProperty("include", "base.properties");
+ props.setProperty("hoodie.embed.timeline.server","false");
+ props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+ props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
+ props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
+ props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
+ props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
+ props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue);
+
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
+ }
+
+ /**
+ * Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource.
+ * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST)
+ * @throws Exception
+ */
+ private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
+ // prep parquet source
+ PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
+ int parquetRecords = 10;
+ prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+ prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT);
+ // delta streamer w/ parquest source
+ String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+ Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp"), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
+ deltaStreamer.shutdownGracefully();
+
+ // prep json kafka source
+ topicName = "topic" + testNum;
+ prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName);
+ // delta streamer w/ json kafka source
+ deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+ Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ true, 100000, false, null, null, "timestamp"), jsc);
+ deltaStreamer.sync();
+ // if auto reset value is set to LATEST, this all kafka records so far may not be synced.
+ int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
+ TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
+
+ // verify 2nd batch to test LATEST auto reset value.
+ prepareJsonKafkaDFSFiles(20, false, topicName);
+ totalExpectedRecords += 20;
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
+ testNum++;
+ }
+
+ @Test
+ public void testJsonKafkaDFSSource() throws Exception {
+ topicName = "topic" + testNum;
+ prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName);
+ String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+ Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ true, 100000, false, null, null, "timestamp"), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+
+ int totalRecords = JSON_KAFKA_NUM_RECORDS;
+ int records = 10;
+ totalRecords += records;
+ prepareJsonKafkaDFSFiles(records, false, topicName);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
+ }
+
+ @Test
+ public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
+ testDeltaStreamerTransitionFromParquetToKafkaSource(false);
+ }
+
+ @Test
+ public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
+ testDeltaStreamerTransitionFromParquetToKafkaSource(true);
+ }
+
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index a6f4edf..ad1b753 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -119,12 +119,14 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
@Test //0 corresponds to fg
public void testMultiTableExecution() throws IOException {
//create topics for each table
- testUtils.createTopic("topic1", 2);
- testUtils.createTopic("topic2", 2);
+ String topicName1 = "topic" + testNum++;
+ String topicName2 = "topic" + testNum;
+ testUtils.createTopic(topicName1, 2);
+ testUtils.createTopic(topicName2, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
- testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -132,21 +134,23 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
TypedProperties properties = executionContexts.get(1).getProperties();
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
+ properties.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName2);
executionContexts.get(1).setProperties(properties);
TypedProperties properties1 = executionContexts.get(0).getProperties();
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc");
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc");
+ properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName1);
executionContexts.get(0).setProperties(properties1);
- String targetBasePath1 = executionContexts.get(1).getConfig().targetBasePath;
- String targetBasePath2 = executionContexts.get(0).getConfig().targetBasePath;
+ String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath;
+ String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath;
streamer.sync();
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
//insert updates for already existing records in kafka topics
- testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
- testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
streamer.sync();
assertEquals(2, streamer.getSuccessTables().size());
assertTrue(streamer.getFailedTables().isEmpty());
@@ -154,5 +158,6 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
//assert the record count matches now
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
+ testNum++;
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index e8cb2a6..9004c66 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -88,7 +88,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
- props.setProperty("auto.offset.reset", resetStrategy);
+ props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", resetStrategy);
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.maxEventsFromKafkaSource));
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 0bbdb23..b83fa78 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -46,6 +46,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
@@ -279,8 +280,12 @@ public class UtilitiesTestBase {
}
public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
+ saveParquetToDFS(records, targetFile, HoodieTestDataGenerator.AVRO_SCHEMA);
+ }
+
+ public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile, Schema schema) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
- .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
+ .withSchema(schema)
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.withWriteMode(Mode.OVERWRITE)
.build()) {
@@ -308,9 +313,9 @@ public class UtilitiesTestBase {
return props;
}
- public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
+ public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, Schema schema) {
try {
- Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+ Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(schema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
@@ -318,9 +323,13 @@ public class UtilitiesTestBase {
}
public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords) {
+ return toGenericRecords(hoodieRecords, HoodieTestDataGenerator.AVRO_SCHEMA);
+ }
+
+ public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
List<GenericRecord> records = new ArrayList<>();
for (HoodieRecord hoodieRecord : hoodieRecords) {
- records.add(toGenericRecord(hoodieRecord));
+ records.add(toGenericRecord(hoodieRecord, schema));
}
return records;
}