You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:47 UTC
[hudi] 05/08: [MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6ef2135b2fa88d5f559d03b15660935cf7efd410
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Wed Feb 1 11:19:45 2023 -0800
[MINOR] Restoring existing behavior for `DeltaStreamer` Incremental Source (#7810)
This is restoring existing behavior for DeltaStreamer Incremental Source, as the change in #7769 removed _hoodie_partition_path field from the dataset making it impossible to be accessed from the DS Transformers for ex
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../apache/hudi/common/config/HoodieConfig.java | 8 --------
.../org/apache/hudi/utilities/UtilHelpers.java | 13 ++++++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 8 ++------
.../hudi/utilities/sources/HoodieIncrSource.java | 23 ++++++++++++++++++++--
5 files changed, 37 insertions(+), 17 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e6525a2b1dc..f56defe7eac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1034,7 +1034,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public HoodieRecordMerger getRecordMerger() {
- List<String> mergers = getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream()
+ List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index a48e4202bf9..223b93e5744 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -142,14 +142,6 @@ public class HoodieConfig implements Serializable {
return StringUtils.split(getString(configProperty), delimiter);
}
- public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty) {
- return getSplitStringsOrDefault(configProperty, ",");
- }
-
- public <T> List<String> getSplitStringsOrDefault(ConfigProperty<T> configProperty, String delimiter) {
- return StringUtils.split(getStringOrDefault(configProperty), delimiter);
- }
-
public String getString(String key) {
return props.getProperty(key);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d159fee0be4..45a9750c3b3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -29,12 +29,16 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -109,6 +113,15 @@ public class UtilHelpers {
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
+ public static HoodieRecordMerger createRecordMerger(Properties props) {
+ List<String> recordMergerImplClasses = ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue()));
+ HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK, recordMergerImplClasses,
+ props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()));
+
+ return recordMerger;
+ }
+
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) throws IOException {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 63cf706c174..77079fe0f4f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -60,8 +59,6 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.IdentityIterator;
import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
@@ -139,6 +136,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -473,9 +471,7 @@ public class DeltaSync implements Serializable, Closeable {
}
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) {
- HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK,
- ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())),
- props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType();
+ HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
&& HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro"))
!= HoodieLogBlockType.PARQUET_DATA_BLOCK) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7134e8ff7cc..c484038b04b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession;
import java.util.Collections;
+import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
+
public class HoodieIncrSource extends RowSource {
private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
@@ -89,6 +91,12 @@ public class HoodieIncrSource extends RowSource {
*/
static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
+
+ /**
+ * Drops all meta fields from the source hudi table while ingesting into sink hudi table.
+ */
+ static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source";
+ public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false;
}
public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
@@ -153,8 +161,19 @@ public class HoodieIncrSource extends RowSource {
queryTypeAndInstantEndpts.getRight().getRight()));
}
- // Remove Hoodie meta columns
- final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
+ HoodieRecord.HoodieRecordType recordType = createRecordMerger(props).getRecordType();
+
+ boolean shouldDropMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
+ Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE)
+ // NOTE: In case when Spark native [[RecordMerger]] is used, we have to make sure
+ // all meta-fields have been properly cleaned up from the incoming dataset
+ //
+ || recordType == HoodieRecord.HoodieRecordType.SPARK;
+
+ // Remove Hoodie meta columns except partition path from input source
+ String[] colsToDrop = shouldDropMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
+ HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
+ final Dataset<Row> src = source.drop(colsToDrop);
return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
}
}