You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/12 12:02:48 UTC

[hudi] branch master updated: [HUDI-2294] Adding virtual keys support to deltastreamer (#3450)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b651336  [HUDI-2294] Adding virtual keys support to deltastreamer (#3450)
b651336 is described below

commit b651336454ec01c4331d2f46b0281a0065500c79
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Thu Aug 12 08:02:39 2021 -0400

    [HUDI-2294] Adding virtual keys support to deltastreamer (#3450)
---
 .../apache/hudi/utilities/deltastreamer/DeltaSync.java   | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a838af4..5272d20 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -54,6 +55,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.sync.common.AbstractSyncTool;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -213,13 +215,12 @@ public class DeltaSync implements Serializable {
     this.props = props;
     this.userProvidedSchemaProvider = schemaProvider;
     this.processedSchema = new SchemaSet();
-
+    this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     refreshTimeline();
     // Register User Provided schema first
     registerAvroSchemas(schemaProvider);
 
     this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
-    this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
 
     this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
 
@@ -249,7 +250,6 @@ public class DeltaSync implements Serializable {
     } else {
       this.commitTimelineOpt = Option.empty();
       String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
-
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(cfg.tableType)
           .setTableName(cfg.targetTableName)
@@ -257,6 +257,11 @@ public class DeltaSync implements Serializable {
           .setPayloadClassName(cfg.payloadClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
+          .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
+          .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
+              Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue())))
+          .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
+              SimpleKeyGenerator.class.getName()))
           .setPreCombineField(cfg.sourceOrderingField)
           .initTable(new Configuration(jssc.hadoopConfiguration()),
             cfg.targetBasePath);
@@ -356,6 +361,11 @@ public class DeltaSync implements Serializable {
           .setPayloadClassName(cfg.payloadClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
+          .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
+          .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
+              Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue())))
+          .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
+              SimpleKeyGenerator.class.getName()))
           .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
     }