You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:47 UTC

[hudi] 05/08: [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6ef2135b2fa88d5f559d03b15660935cf7efd410
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Wed Feb 1 11:19:45 2023 -0800

    [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)
    
    This is restoring existing behavior for DeltaStreamer Incremental Source, as the change in #7769 removed _hoodie_partition_path field from the dataset making it impossible to be accessed from the DS Transformers for ex
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../apache/hudi/common/config/HoodieConfig.java    |  8 --------
 .../org/apache/hudi/utilities/UtilHelpers.java     | 13 ++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  8 ++------
 .../hudi/utilities/sources/HoodieIncrSource.java   | 23 ++++++++++++++++++++--
 5 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e6525a2b1dc..f56defe7eac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1034,7 +1034,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public HoodieRecordMerger getRecordMerger() {
-    List<String> mergers = getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream()
+    List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
         .map(String::trim)
         .distinct()
         .collect(Collectors.toList());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index a48e4202bf9..223b93e5744 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -142,14 +142,6 @@ public class HoodieConfig implements Serializable {
     return StringUtils.split(getString(configProperty), delimiter);
   }
 
-  public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty) {
-    return getSplitStringsOrDefault(configProperty, ",");
-  }
-
-  public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty, String delimiter) {
-    return StringUtils.split(getStringOrDefault(configProperty), delimiter);
-  }
-
   public String getString(String key) {
     return props.getProperty(key);
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d159fee0be4..45a9750c3b3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -29,12 +29,16 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -109,6 +113,15 @@ public class UtilHelpers {
 
   private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
 
+  public static HoodieRecordMerger createRecordMerger(Properties props) {
+    List<String> recordMergerImplClasses = ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
+        HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue()));
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, recordMergerImplClasses,
+        props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()));
+
+    return recordMerger;
+  }
+
   public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
       SparkSession sparkSession, SchemaProvider schemaProvider,
       HoodieDeltaStreamerMetrics metrics) throws IOException {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 63cf706c174..77079fe0f4f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -60,8 +59,6 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.IdentityIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
@@ -139,6 +136,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -473,9 +471,7 @@ public class DeltaSync implements Serializable, Closeable {
   }
 
   private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) {
-    HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK,
-        ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())),
-        props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType();
+    HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
         && HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro"))
         != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7134e8ff7cc..c484038b04b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession;
 
 import java.util.Collections;
 
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
+
 public class HoodieIncrSource extends RowSource {
 
   private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
@@ -89,6 +91,12 @@ public class HoodieIncrSource extends RowSource {
      */
     static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
     static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
+
+    /**
+     * Drops all meta fields from the source hudi table while ingesting into sink hudi table.
+     */
+    static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source";
+    public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false;
   }
 
   public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
@@ -153,8 +161,19 @@ public class HoodieIncrSource extends RowSource {
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    // Remove Hoodie meta columns
-    final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
+    HoodieRecord.HoodieRecordType recordType = createRecordMerger(props).getRecordType();
+
+    boolean shouldDropMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
+        Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE)
+        // NOTE: In case when Spark native [[RecordMerger]] is used, we have to make sure
+        //       all meta-fields have been properly cleaned up from the incoming dataset
+        //
+        || recordType == HoodieRecord.HoodieRecordType.SPARK;
+
+    // Remove Hoodie meta columns except partition path from input source
+    String[] colsToDrop = shouldDropMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
+        HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
+    final Dataset<Row> src = source.drop(colsToDrop);
     return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
   }
 }