You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/07/04 18:09:21 UTC
[hudi] branch release-feature-rfc46 updated: [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
new 276787aa2f [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627)
276787aa2f is described below
commit 276787aa2f8c7d23e3ec138c297faac121f16a1b
Author: wulei <wu...@bytedance.com>
AuthorDate: Tue Jul 5 02:09:12 2022 +0800
[HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627)
Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
.../apache/hudi/config/HoodieCompactionConfig.java | 12 +
.../org/apache/hudi/config/HoodieWriteConfig.java | 11 +
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 5 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 4 +
.../hudi/table/action/commit/BaseWriteHelper.java | 7 +-
.../table/action/commit/HoodieWriteHelper.java | 11 +-
.../hudi/testutils/HoodieWriteableTestTable.java | 13 +-
.../hudi/table/action/commit/FlinkWriteHelper.java | 6 +-
.../hudi/table/action/commit/JavaWriteHelper.java | 7 +-
.../spark/sql/HoodieCatalystExpressionUtils.scala | 21 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 7 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 19 +-
.../hudi/common/model/HoodieAvroIndexedRecord.java | 17 +-
.../apache/hudi/common/model/HoodieAvroRecord.java | 47 +---
.../hudi/common/model/HoodieAvroRecordMerge.java | 57 +++++
.../org/apache/hudi/common/model/HoodieMerge.java | 38 +++
.../org/apache/hudi/common/model/HoodieRecord.java | 36 +--
.../hudi/common/table/HoodieTableConfig.java | 15 ++
.../hudi/common/table/HoodieTableMetaClient.java | 10 +
.../table/log/AbstractHoodieLogRecordReader.java | 7 +
.../table/log/HoodieMergedLogRecordScanner.java | 7 +-
.../table/log/block/HoodieParquetDataBlock.java | 2 +-
.../apache/hudi/common/util/HoodieRecordUtils.java | 68 +++++
.../apache/hudi/common/util/ReflectionUtils.java | 13 -
.../apache/hudi/common/util/SpillableMapUtils.java | 4 +-
.../hudi/common/util/HoodieRecordUtilsTest.java | 47 ++++
.../apache/hudi/configuration/FlinkOptions.java | 8 +
.../org/apache/hudi/sink/StreamWriteFunction.java | 15 +-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 6 +
.../org/apache/hudi/table/HoodieTableSource.java | 3 +-
.../table/format/mor/MergeOnReadInputFormat.java | 16 +-
.../table/format/mor/MergeOnReadTableState.java | 9 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 2 +
.../apache/hudi/source/TestStreamReadOperator.java | 3 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 6 +
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 6 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 11 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +
.../spark/sql/hudi/HoodieInternalRowUtils.scala | 282 +++++++++++++++++++++
.../apache/spark/sql/hudi/HoodieSparkRecord.java | 190 ++++++++++++++
.../spark/sql/hudi/HoodieSparkRecordMerge.java | 48 ++++
.../apache/hudi/TestHoodieInternalRowUtils.scala | 114 +++++++++
.../hudi/TestStructTypeSchemaEvolutionUtils.scala | 222 ++++++++++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +
.../deltastreamer/HoodieDeltaStreamer.java | 5 +
45 files changed, 1296 insertions(+), 145 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 4003a07de7..11cf59a74c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -234,6 +235,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "the record payload class to merge records in the log against each other, merge again with the base file and "
+ "produce the final record to be written after compaction.");
+ public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+ .key("hoodie.compaction.merge.class")
+ .defaultValue(HoodieAvroRecordMerge.class.getName())
+ .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ + "types, such as Spark records or Flink records.");
+
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("true")
@@ -691,6 +698,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
+ public Builder withMergeClass(String mergeClass) {
+ compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
+ return this;
+ }
+
public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d18238fa4b..cbf75b3efb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -123,6 +124,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
+ public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+ .key("hoodie.datasource.write.merge.class")
+ .defaultValue(HoodieAvroRecordMerge.class.getName())
+ .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ + "types, such as Spark records or Flink records.");
+
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.noDefaultValue()
@@ -1324,6 +1331,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME);
}
+ public String getMergeClass() {
+ return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
+ }
+
public int getTargetPartitionsPerDayBasedCompaction() {
return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3b6bda7877..fff4aa6d05 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -338,10 +338,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
try {
- Option<HoodieRecord> combinedRecord =
- hoodieRecord.combineAndGetUpdateValue(oldRecord,
- schema,
- props);
+ Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);
if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 09f8831f8b..9ee6e0884d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -27,7 +27,9 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
*/
protected final Schema tableSchema;
protected final Schema tableSchemaWithMetaFields;
+ protected final HoodieMerge merge;
/**
* The write schema. In most case the write schema is the same to the
@@ -103,6 +106,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+ this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass());
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index d8682152fa..1efe3d9641 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,7 +19,9 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -81,9 +83,10 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
- return deduplicateRecords(records, table.getIndex(), parallelism);
+ HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass());
+ return deduplicateRecords(records, table.getIndex(), parallelism, merge);
}
public abstract I deduplicateRecords(
- I records, HoodieIndex<?, ?> index, int parallelism);
+ I records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index e24cd71ab6..57bb511c63 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -23,13 +23,13 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
-
private HoodieWriteHelper() {
}
@@ -49,7 +49,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
@@ -58,10 +58,9 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
- HoodieRecord reducedRec = rec2.preCombine(rec1);
- HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey();
-
- return (HoodieRecord<T>) reducedRec.newInstance(reducedKey);
+ HoodieRecord<T> reducedRecord = merge.preCombine(rec1, rec2);
+ HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
+ return reducedRecord.newInstance(reducedKey);
}, parallelism).map(Pair::getRight);
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index d2ecd09a23..a952576026 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -19,6 +19,12 @@
package org.apache.hudi.testutils;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -44,13 +50,6 @@ import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index cc639a4c16..ef1047564a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
@@ -87,13 +88,14 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
@Override
public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
- @SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1);
+ @SuppressWarnings("unchecked")
+ final HoodieRecord reducedRec = merge.preCombine(rec1, rec2);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 091c2287f1..3a7513a75a 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -54,7 +55,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
@Override
public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
@@ -65,11 +66,11 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
- HoodieRecord reducedRec = rec2.preCombine(rec1);
+ HoodieRecord<T> reducedRecord = merge.preCombine(rec1,rec2);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
- return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
+ return reducedRecord.newInstance(rec1.getKey());
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index a3b9c210b9..636dd299fe 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType
@@ -42,6 +42,23 @@ trait HoodieCatalystExpressionUtils {
GenerateUnsafeProjection.generate(targetExprs, attrs)
}
+ /**
+ * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]]
+ *
+ * NOTE: No safety checks are executed to validate that this projection is actually feasible,
+ * it's up to the caller to make sure that such projection is possible.
+ *
+ * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
+ * B is a subset of A
+ */
+ def generateMutableProjection(from: StructType, to: StructType): MutableProjection = {
+ val attrs = from.toAttributes
+ val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+ val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+ GenerateMutableProjection.generate(targetExprs, attrs)
+ }
+
/**
* Parses and resolves expression against the attributes of the given table schema.
*
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index cebf3145bf..e8a4a82026 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -38,12 +38,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
@@ -461,7 +463,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
- List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+ HoodieMerge merge = new HoodieAvroRecordMerge();
+ List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);
@@ -469,7 +472,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
- dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+ dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 470bc723f6..cfe8f2dafe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -765,9 +766,7 @@ public class HoodieAvroUtils {
}
switch (newSchema.getType()) {
case RECORD:
- if (!(oldRecord instanceof IndexedRecord)) {
- throw new IllegalArgumentException("cannot rewrite record with different type");
- }
+ ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> helper = new HashMap<>();
@@ -806,9 +805,7 @@ public class HoodieAvroUtils {
}
return newRecord;
case ARRAY:
- if (!(oldRecord instanceof Collection)) {
- throw new IllegalArgumentException("cannot rewrite record with different type");
- }
+ ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
fieldNames.push("element");
@@ -818,9 +815,7 @@ public class HoodieAvroUtils {
fieldNames.pop();
return newArray;
case MAP:
- if (!(oldRecord instanceof Map)) {
- throw new IllegalArgumentException("cannot rewrite record with different type");
- }
+ ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
@@ -836,7 +831,7 @@ public class HoodieAvroUtils {
}
}
- private static String createFullName(Deque<String> fieldNames) {
+ public static String createFullName(Deque<String> fieldNames) {
String result = "";
if (!fieldNames.isEmpty()) {
List<String> parentNames = new ArrayList<>();
@@ -971,7 +966,7 @@ public class HoodieAvroUtils {
}
// convert days to Date
- private static java.sql.Date toJavaDate(int days) {
+ public static java.sql.Date toJavaDate(int days) {
long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
int timeZoneOffset;
TimeZone defaultTimeZone = TimeZone.getDefault();
@@ -984,7 +979,7 @@ public class HoodieAvroUtils {
}
// convert Date to days
- private static int fromJavaDate(Date date) {
+ public static int fromJavaDate(Date date) {
long millisUtc = date.getTime();
long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, MILLIS_PER_DAY));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index ac2df00151..daec2fee03 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -48,7 +48,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) {
- super(key, data, operation);
+ super(key, data, operation, null);
}
public HoodieAvroIndexedRecord(HoodieRecord<IndexedRecord> record) {
@@ -67,11 +67,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
return Option.of(data);
}
- @Override
- public Comparable<?> getOrderingValue() {
- throw new UnsupportedOperationException();
- }
-
@Override
public HoodieRecord newInstance() {
throw new UnsupportedOperationException();
@@ -99,16 +94,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
.map(Object::toString).orElse(null);
}
- @Override
- public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
- return Option.empty();
- }
-
@Override
public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
ValidationUtils.checkState(other instanceof HoodieAvroIndexedRecord);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 9a9011da37..65866238e9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -21,11 +21,10 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -47,7 +46,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
public HoodieAvroRecord(HoodieKey key, T data, HoodieOperation operation) {
- super(key, data, operation);
+ super(key, data, operation, null);
}
public HoodieAvroRecord(HoodieRecord<T> record) {
@@ -106,34 +105,6 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
// NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here
// for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
// is complete
- //
- // TODO cleanup
-
- // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other
- // object, and may not create a new one
- @Override
- public HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord) {
- T picked = unsafeCast(getData().preCombine(previousRecord.getData()));
- if (picked instanceof HoodieMetadataPayload) {
- // NOTE: HoodieMetadataPayload return a new payload
- return new HoodieAvroRecord<>(getKey(), picked, getOperation());
- }
- return picked.equals(getData()) ? this : previousRecord;
- }
-
- // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could
- // be combined
- @Override
- public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
- Option<IndexedRecord> previousRecordAvroPayload = previousRecord.toIndexedRecord(schema, props);
- if (!previousRecordAvroPayload.isPresent()) {
- return Option.empty();
- }
-
- return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props)
- .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
- }
-
@Override
public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
ValidationUtils.checkState(other instanceof HoodieAvroRecord);
@@ -141,7 +112,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
(GenericRecord) toIndexedRecord(readerSchema, new Properties()).get(),
(GenericRecord) other.toIndexedRecord(readerSchema, new Properties()).get(),
writerSchema);
- return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getPrecombineValue(getData())), getOperation());
+ return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getOrderingValue()), getOperation());
}
@Override
@@ -234,20 +205,10 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
@Nonnull
private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, Comparable newPreCombineVal) {
return unsafeCast(
- ReflectionUtils.loadPayload(
+ HoodieRecordUtils.loadPayload(
getData().getClass().getCanonicalName(),
new Object[]{combinedAvroPayload, newPreCombineVal},
GenericRecord.class,
Comparable.class));
}
-
- private static <T extends HoodieRecordPayload> Comparable getPrecombineValue(T data) {
- if (data instanceof BaseAvroPayload) {
- return ((BaseAvroPayload) data).orderingVal;
- }
-
- return -1;
- }
-
- //////////////////////////////////////////////////////////////////////////////
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
new file mode 100644
index 0000000000..c115b5b94d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.TypeUtils.unsafeCast;
+
+public class HoodieAvroRecordMerge implements HoodieMerge {
+ @Override
+ public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+ HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData()));
+ if (picked instanceof HoodieMetadataPayload) {
+ // NOTE: HoodieMetadataPayload return a new payload
+ return new HoodieAvroRecord(newer.getKey(), picked, newer.getOperation());
+ }
+ return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older;
+ }
+
+ @Override
+ public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+ Option<IndexedRecord> previousRecordAvroPayload;
+ if (older instanceof HoodieAvroIndexedRecord) {
+ previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) older).getData());
+ } else {
+ previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props);
+ }
+ if (!previousRecordAvroPayload.isPresent()) {
+ return Option.empty();
+ }
+
+ return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props)
+ .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
new file mode 100644
index 0000000000..6becf35591
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * HoodieMerge defines how to merge two records. It is a stateless component.
+ * It can implement the merging logic of HoodieRecord of different engines
+ * and avoid the performance consumption caused by the serialization/deserialization of Avro payload.
+ */
+public interface HoodieMerge extends Serializable {
+
+ HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+
+ Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 1b6e79f097..8450aad5e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -131,25 +131,35 @@ public abstract class HoodieRecord<T> implements Serializable {
*/
private HoodieOperation operation;
+ /**
+ * For purposes of preCombining.
+ */
+ private Comparable<?> orderingVal;
+
public HoodieRecord(HoodieKey key, T data) {
- this(key, data, null);
+ this(key, data, null, null);
}
- public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
+ public HoodieRecord(HoodieKey key, T data, Comparable<?> orderingVal) {
+ this(key, data, null, orderingVal);
+ }
+
+ public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Comparable<?> orderingVal) {
this.key = key;
this.data = data;
this.currentLocation = null;
this.newLocation = null;
this.sealed = false;
this.operation = operation;
+ // default natural order is 0
+ this.orderingVal = orderingVal == null ? 0 : orderingVal;
}
public HoodieRecord(HoodieRecord<T> record) {
- this(record.key, record.data);
+ this(record.key, record.data, record.operation, record.orderingVal);
this.currentLocation = record.currentLocation;
this.newLocation = record.newLocation;
this.sealed = record.sealed;
- this.operation = record.operation;
}
public HoodieRecord() {
@@ -169,15 +179,17 @@ public abstract class HoodieRecord<T> implements Serializable {
return operation;
}
+ public Comparable<?> getOrderingValue() {
+ return orderingVal;
+ }
+
public T getData() {
if (data == null) {
- throw new IllegalStateException("Payload already deflated for record.");
+ throw new IllegalStateException("HoodieRecord already deflated for record.");
}
return data;
}
- public abstract Comparable<?> getOrderingValue();
-
/**
* Release the actual payload, to ease memory pressure. To be called after the record has been written to storage.
* Once deflated, cannot be inflated.
@@ -281,16 +293,6 @@ public abstract class HoodieRecord<T> implements Serializable {
// for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
// is complete
//
- // TODO cleanup
-
- // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other
- // object, and may not create a new one
- public abstract HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord);
-
- // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could
- // be combined
- public abstract Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException;
-
public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException;
public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 886911466b..bb477f9de7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -154,6 +155,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then "
+ " produce a new base file.");
+ public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+ .key("hoodie.compaction.merge.class")
+ .defaultValue(HoodieAvroRecordMerge.class.getName())
+ .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ + "types, such as Spark records or Flink records.");
+
public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
.key("hoodie.archivelog.folder")
.defaultValue("archived")
@@ -480,6 +487,14 @@ public class HoodieTableConfig extends HoodieConfig {
"org.apache.hudi");
}
+ /**
+ * Read the hoodie merge class for HoodieRecords from the table properties.
+ */
+ public String getMergeClass() {
+ return getStringOrDefault(MERGE_CLASS_NAME).replace("com.uber.hoodie",
+ "org.apache.hudi");
+ }
+
public String getPreCombineField() {
return getString(PRECOMBINE_FIELD);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 9945eb0650..c6fd18dc41 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -725,6 +725,7 @@ public class HoodieTableMetaClient implements Serializable {
private String recordKeyFields;
private String archiveLogFolder;
private String payloadClassName;
+ private String mergeClassName;
private Integer timelineLayoutVersion;
private String baseFileFormat;
private String preCombineField;
@@ -791,6 +792,11 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
+ public PropertyBuilder setMergeClassName(String mergeClassName) {
+ this.mergeClassName = mergeClassName;
+ return this;
+ }
+
public PropertyBuilder setPayloadClass(Class<? extends HoodieRecordPayload> payloadClass) {
return setPayloadClassName(payloadClass.getName());
}
@@ -1004,6 +1010,10 @@ public class HoodieTableMetaClient implements Serializable {
tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName);
}
+ if (mergeClassName != null) {
+ tableConfig.setValue(HoodieTableConfig.MERGE_CLASS_NAME, mergeClassName);
+ }
+
if (null != tableCreateSchema) {
tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, tableCreateSchema);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 84abfec3de..209a358abe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordReader {
private final String payloadClassFQN;
// preCombine field
private final String preCombineField;
+ // Stateless component for merging records
+ private final String mergeClassFQN;
// simple key gen fields
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
@@ -160,6 +162,7 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
+ this.mergeClassFQN = tableConfig.getMergeClass();
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
@@ -525,6 +528,10 @@ public abstract class AbstractHoodieLogRecordReader {
return payloadClassFQN;
}
+ protected String getMergeClassFQN() {
+ return mergeClassFQN;
+ }
+
public Option<String> getPartitionName() {
return partitionName;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 6d313b64f9..dfc3c14b5b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -24,10 +24,12 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -78,6 +80,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
+ private final HoodieMerge merge;
+
@SuppressWarnings("unchecked")
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
@@ -95,6 +99,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+ this.merge = HoodieRecordUtils.loadMerge(getMergeClassFQN());
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
@@ -150,7 +155,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
- HoodieRecordPayload combinedValue = (HoodieRecordPayload)hoodieRecord.preCombine(oldRecord).getData();
+ HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord).getData();
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
HoodieOperation operation = hoodieRecord.getOperation();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index ebe53fe471..98f5dcf3a0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -31,8 +31,8 @@ import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieAvroFileReader.HoodieRecordTransformIterator;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
new file mode 100644
index 0000000000..075d117fe2
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for HoodieRecord.
+ */
+public class HoodieRecordUtils {
+
+ private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
+
+ /**
+ * Instantiate a given class with a record merge.
+ */
+ public static HoodieMerge loadMerge(String mergeClass) {
+ try {
+ HoodieMerge merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+ if (null == merge) {
+ synchronized (HoodieMerge.class) {
+ merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+ if (null == merge) {
+ merge = (HoodieMerge)ReflectionUtils.loadClass(mergeClass, new Object[]{});
+ INSTANCE_CACHE.put(mergeClass, merge);
+ }
+ }
+ }
+ return merge;
+ } catch (HoodieException e) {
+ throw new HoodieException("Unable to instantiate hoodie merge class ", e);
+ }
+ }
+
+ /**
+ * Instantiate a given class with an avro record payload.
+ */
+ public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs,
+ Class<?>... constructorArgTypes) {
+ try {
+ return (T) ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new HoodieException("Unable to instantiate payload class ", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index a4ef09641d..3cd2396491 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.util;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
@@ -69,18 +68,6 @@ public class ReflectionUtils {
}
}
- /**
- * Instantiate a given class with a generic record payload.
- */
- public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs,
- Class<?>... constructorArgTypes) {
- try {
- return (T) getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new HoodieException("Unable to instantiate payload class ", e);
- }
- }
-
/**
* Creates an instance of the given class. Use this version when dealing with interface types as constructor args.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
index d4bafd9c9f..d2d91bbfb6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
@@ -137,7 +137,7 @@ public class SpillableMapUtils {
HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
- ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class,
+ HoodieRecordUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class,
Comparable.class), operation);
return (R) hoodieRecord;
@@ -163,7 +163,7 @@ public class SpillableMapUtils {
*/
public static <R> R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) {
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
- ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class));
+ HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class));
return (R) hoodieRecord;
}
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
new file mode 100644
index 0000000000..0c51571c9e
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HoodieRecordUtilsTest {
+
+ @Test
+ void loadHoodieMerge() {
+ String mergeClassName = HoodieAvroRecordMerge.class.getName();
+ HoodieMerge merge1 = HoodieRecordUtils.loadMerge(mergeClassName);
+ HoodieMerge merge2 = HoodieRecordUtils.loadMerge(mergeClassName);
+ assertEquals(merge1.getClass().getName(), mergeClassName);
+ assertEquals(merge1, merge2);
+ }
+
+ @Test
+ void loadPayload() {
+ String payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ HoodieRecordPayload payload = HoodieRecordUtils.loadPayload(payloadClassName, new Object[]{null, 0}, GenericRecord.class, Comparable.class);
+ assertEquals(payload.getClass().getName(), payloadClassName);
+ }
+}
\ No newline at end of file
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a436963bfc..0dc0cd1b88 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -283,6 +284,13 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
+ public static final ConfigOption<String> MERGE_CLASS_NAME = ConfigOptions
+ .key("write.merge.class")
+ .stringType()
+ .defaultValue(HoodieAvroRecordMerge.class.getName())
+ .withDescription("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ + "types, such as Spark records or Flink records.");
+
/**
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c2f54dd8aa..4d4af6e218 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -102,6 +104,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
+ private transient HoodieMerge merge;
+
/**
* Total size tracer.
*/
@@ -121,6 +125,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
this.tracer = new TotalSizeTracer(this.config);
initBuffer();
initWriteFunction();
+ initMergeClass();
}
@Override
@@ -194,6 +199,12 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
}
}
+ private void initMergeClass() {
+ String mergeClassName = metaClient.getTableConfig().getMergeClass();
+ LOG.info("init hoodie merge with class [{}]", mergeClassName);
+ merge = HoodieRecordUtils.loadMerge(mergeClassName);
+ }
+
/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
@@ -420,7 +431,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+ records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge);
}
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
@@ -455,7 +466,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+ records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge);
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index e9574dd52b..fef948b797 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -19,6 +19,7 @@
package org.apache.hudi.streamer;
import org.apache.hudi.client.utils.OperationConverter;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -117,6 +118,10 @@ public class FlinkStreamerConfig extends Configuration {
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+ @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records."
+ + "Implement your own, if you want to implement specific record merge logic.")
+ public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
public WriteOperationType operation = WriteOperationType.UPSERT;
@@ -356,6 +361,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
+ conf.setString(FlinkOptions.MERGE_CLASS_NAME, config.mergeClassName);
conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index e76bb29bdf..08d0cf66da 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -425,7 +425,8 @@ public class HoodieTableSource implements
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits,
- conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+ conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","),
+ conf.getString(FlinkOptions.MERGE_CLASS_NAME));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.tableState(hoodieTableState)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 8eaa9d0b88..5fd8965791 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,12 +18,15 @@
package org.apache.hudi.table.format.mor;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -62,6 +65,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.IntStream;
@@ -202,7 +206,8 @@ public class MergeOnReadInputFormat
this.requiredPos,
this.emitDelete,
this.tableState.getOperationPos(),
- getFullSchemaReader(split.getBasePath().get()));
+ getFullSchemaReader(split.getBasePath().get()),
+ tableState.getMergeClass());
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
@@ -629,6 +634,8 @@ public class MergeOnReadInputFormat
private final InstantRange instantRange;
+ private final HoodieMerge merge;
+
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
@@ -649,7 +656,8 @@ public class MergeOnReadInputFormat
int[] requiredPos,
boolean emitDelete,
int operationPos,
- ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
+ ParquetColumnarRowSplitReader reader, // the reader should be with full schema
+ String mergeClass) {
this.tableSchema = tableSchema;
this.reader = reader;
this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf);
@@ -663,6 +671,7 @@ public class MergeOnReadInputFormat
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
this.instantRange = split.getInstantRange().orElse(null);
+ this.merge = HoodieRecordUtils.loadMerge(mergeClass);
}
@Override
@@ -753,7 +762,8 @@ public class MergeOnReadInputFormat
String curKey) throws IOException {
final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
- return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+ Option<HoodieRecord> resultRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, new Properties());
+ return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord();
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecbb79..bbb21db7f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -41,6 +41,7 @@ public class MergeOnReadTableState implements Serializable {
private final List<MergeOnReadInputSplit> inputSplits;
private final String[] pkFields;
private final int operationPos;
+ private final String mergeClass;
public MergeOnReadTableState(
RowType rowType,
@@ -48,7 +49,8 @@ public class MergeOnReadTableState implements Serializable {
String avroSchema,
String requiredAvroSchema,
List<MergeOnReadInputSplit> inputSplits,
- String[] pkFields) {
+ String[] pkFields,
+ String mergeClass) {
this.rowType = rowType;
this.requiredRowType = requiredRowType;
this.avroSchema = avroSchema;
@@ -56,6 +58,7 @@ public class MergeOnReadTableState implements Serializable {
this.inputSplits = inputSplits;
this.pkFields = pkFields;
this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
+ this.mergeClass = mergeClass;
}
public RowType getRowType() {
@@ -82,6 +85,10 @@ public class MergeOnReadTableState implements Serializable {
return operationPos;
}
+ public String getMergeClass() {
+ return mergeClass;
+ }
+
public int[] getRequiredPositions() {
final List<String> fieldNames = rowType.getFieldNames();
return requiredRowType.getFieldNames().stream()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d292b3832a..c38af7eff4 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -181,6 +181,7 @@ public class StreamerUtil {
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+ .withMergeClass(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
.withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
.withInlineCompactionTriggerStrategy(
CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
@@ -275,6 +276,7 @@ public class StreamerUtil {
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+ .setMergeClassName(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
.setPreCombineField(OptionsResolver.getPreCombineField(conf))
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 9f2aba77c1..4dd5f0232b 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -261,7 +261,8 @@ public class TestStreamReadOperator {
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList(),
- new String[0]);
+ new String[0],
+ metaClient.getTableConfig().getMergeClass());
MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
.config(conf)
.tableState(hoodieTableState)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 819c4b55a9..26755b3b85 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -301,6 +301,12 @@ object DataSourceWriteOptions {
*/
val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME
+ /**
+ * HoodieMerge will replace the payload to process the merge of data
+ * and provide the same capabilities as the payload
+ */
+ val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME
+
/**
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested fields can be specified using
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ab492b05cc..da13142fe8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -67,7 +67,8 @@ case class HoodieTableState(tablePath: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String,
- metadataConfig: HoodieMetadataConfig)
+ metadataConfig: HoodieMetadataConfig,
+ mergeClass: String)
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -395,7 +396,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
preCombineFieldOpt = preCombineFieldOpt,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass,
- metadataConfig = fileIndex.metadataConfig
+ metadataConfig = fileIndex.metadataConfig,
+ mergeClass = tableConfig.getMergeClass
)
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 37567e0bb2..3ff04212a5 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -29,8 +29,9 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
-import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.common.util.HoodieRecordUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.exception.HoodieException
@@ -261,6 +262,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema))
private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+ private val merge = HoodieRecordUtils.loadMerge(tableState.mergeClass)
override def hasNext: Boolean = hasNextInternal
@@ -303,7 +305,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
// NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
// on the record from the Delta Log
- toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps))
+ val combinedRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, payloadProps)
+ if (combinedRecord.isPresent) {
+ toScalaOption(combinedRecord.get.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord)
+ } else {
+ Option.empty
+ }
}
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f9deda6779..1e0179868c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -150,6 +150,7 @@ object HoodieSparkSqlWriter {
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+ .setMergeClassName(hoodieConfig.getString(MERGE_CLASS_NAME))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
@@ -457,6 +458,7 @@ object HoodieSparkSqlWriter {
.setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
+ .setMergeClassName(hoodieConfig.getStringOrDefault(MERGE_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..4ff5cceef7
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import org.apache.avro.Schema
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate}
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.hudi.common.util.ValidationUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
+import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * Helper class to do common stuff across Spark InternalRow.
+ * Provides common methods similar to {@link HoodieAvroUtils}.
+ */
+object HoodieInternalRowUtils {
+
+ val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection]
+ val schemaMap = new ConcurrentHashMap[Schema, StructType]
+ val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]]
+
+ def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
+ val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
+ val row = new JoinedRow(left, right)
+ val projection = getCachedProjection(mergeSchema, stitchedSchema)
+ projection(row)
+ }
+
+ def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
+ val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
+
+ val oldFieldMap = getCachedSchemaPosMap(oldSchema)
+ for ((field, pos) <- newSchema.fields.zipWithIndex) {
+ var oldValue: AnyRef = null
+ if (oldFieldMap.contains(field.name)) {
+ val (oldField, oldPos) = oldFieldMap(field.name)
+ oldValue = oldRecord.get(oldPos, oldField.dataType)
+ }
+ if (oldValue != null) {
+ field.dataType match {
+ case structType: StructType =>
+ val oldField = oldFieldMap(field.name)._1.asInstanceOf[StructType]
+ rewriteRecord(oldValue.asInstanceOf[InternalRow], oldField, structType)
+ case decimalType: DecimalType =>
+ val oldField = oldFieldMap(field.name)._1.asInstanceOf[DecimalType]
+ if (decimalType.scale != oldField.scale || decimalType.precision != oldField.precision) {
+ newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+ )
+ } else {
+ newRow.update(pos, oldValue)
+ }
+ case _ =>
+ newRow.update(pos, oldValue)
+ }
+ } else {
+ // TODO default value in newSchema
+ }
+ }
+
+ newRow
+ }
+
+ def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: util.Map[String, String]): InternalRow = {
+ rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new util.LinkedList[String]).asInstanceOf[InternalRow]
+ }
+
+ private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: util.Map[String, String], fieldNames: util.Deque[String]): Any = {
+ if (oldRecord == null) {
+ null
+ } else {
+ newSchema match {
+ case targetSchema: StructType =>
+ ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], "cannot rewrite record with different type")
+ val oldRow = oldRecord.asInstanceOf[InternalRow]
+ val helper = mutable.Map[Integer, Any]()
+
+ val oldSchemaPos = getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType])
+ targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
+ fieldNames.push(field.name)
+ if (oldSchemaPos.contains(field.name)) {
+ val (oldField, oldPos) = oldSchemaPos(field.name)
+ helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+ } else {
+ val fieldFullName = createFullName(fieldNames)
+ val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
+ val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
+ // deal with rename
+ if (!oldSchemaPos.contains(field.name) && oldSchemaPos.contains(lastColNameFromOldSchema)) {
+ // find rename
+ val (oldField, oldPos) = oldSchemaPos(lastColNameFromOldSchema)
+ helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+ }
+ }
+ fieldNames.pop()
+ }
+ val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
+ targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
+ if (helper.contains(i)) {
+ newRow.update(i, helper(i))
+ } else {
+ // TODO add default val
+ newRow.update(i, null)
+ }
+ }
+
+ newRow
+ case targetSchema: ArrayType =>
+ ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], "cannot rewrite record with different type")
+ val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
+ val oldArray = oldRecord.asInstanceOf[ArrayData]
+ val newElementType = targetSchema.elementType
+ val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
+ fieldNames.push("element")
+ oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
+ fieldNames.pop()
+
+ newArray
+ case targetSchema: MapType =>
+ ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], "cannot rewrite record with different type")
+ val oldValueType = oldSchema.asInstanceOf[MapType].valueType
+ val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
+ val oldMap = oldRecord.asInstanceOf[MapData]
+ val newValueType = targetSchema.valueType
+ val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
+ val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
+ val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
+ fieldNames.push("value")
+ oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, value) }
+ oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
+ fieldNames.pop()
+
+ newMap
+ case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
+ }
+ }
+ }
+
+ def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
+ val newRecord = rewriteRecord(record, oldSchema, newSchema)
+ newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
+
+ newRecord
+ }
+
+ def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
+ val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, new util.HashMap[String, String]())
+ newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
+
+ newRecord
+ }
+
+ def getCachedSchema(schema: Schema): StructType = {
+ if (!schemaMap.contains(schema)) {
+ schemaMap.synchronized {
+ if (!schemaMap.contains(schema)) {
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ schemaMap.put(schema, structType)
+ }
+ }
+ }
+ schemaMap.get(schema)
+ }
+
+ private def getCachedProjection(from: StructType, to: StructType): Projection = {
+ val schemaPair = (from, to)
+ if (!projectionMap.contains(schemaPair)) {
+ projectionMap.synchronized {
+ if (!projectionMap.contains(schemaPair)) {
+ val projection = generateMutableProjection(from, to)
+ projectionMap.put(schemaPair, projection)
+ }
+ }
+ }
+ projectionMap.get(schemaPair)
+ }
+
+ def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = {
+ if (!SchemaPosMap.contains(schema)) {
+ SchemaPosMap.synchronized {
+ if (!SchemaPosMap.contains(schema)) {
+ val fieldMap = schema.fields.zipWithIndex.map { case (field, i) => (field.name, (field, i)) }.toMap
+ SchemaPosMap.put(schema, fieldMap)
+ }
+ }
+ }
+ SchemaPosMap.get(schema)
+ }
+
+ private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+ if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
+ oldSchema match {
+ case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType =>
+ oldValue
+ case DecimalType() =>
+ Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+ case _ =>
+ throw new HoodieException("Unknown schema type: " + newSchema)
+ }
+ } else {
+ rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
+ }
+ }
+
+ private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+ val value = newSchema match {
+ case NullType | BooleanType =>
+ case DateType if oldSchema.equals(StringType) =>
+ fromJavaDate(java.sql.Date.valueOf(oldValue.toString))
+ case LongType =>
+ oldSchema match {
+ case IntegerType => oldValue.asInstanceOf[Int].longValue()
+ case _ =>
+ }
+ case FloatType =>
+ oldSchema match {
+ case IntegerType => oldValue.asInstanceOf[Int].floatValue()
+ case LongType => oldValue.asInstanceOf[Long].floatValue()
+ case _ =>
+ }
+ case DoubleType =>
+ oldSchema match {
+ case IntegerType => oldValue.asInstanceOf[Int].doubleValue()
+ case LongType => oldValue.asInstanceOf[Long].doubleValue()
+ case FloatType => java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "")
+ case _ =>
+ }
+ case BinaryType =>
+ oldSchema match {
+ case StringType => oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)
+ case _ =>
+ }
+ case StringType =>
+ oldSchema match {
+ case BinaryType => new String(oldValue.asInstanceOf[Array[Byte]])
+ case DateType => toJavaDate(oldValue.asInstanceOf[Integer]).toString
+ case IntegerType | LongType | FloatType | DoubleType | DecimalType() => oldValue.toString
+ case _ =>
+ }
+ case DecimalType() =>
+ oldSchema match {
+ case IntegerType | LongType | FloatType | DoubleType | StringType =>
+ val scale = newSchema.asInstanceOf[DecimalType].scale
+
+ Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+ case _ =>
+ }
+ case _ =>
+ }
+ if (value == None) {
+ throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
+ } else {
+ value
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
new file mode 100644
index 0000000000..a22e78af21
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, Comparable orderingVal) {
+ super(key, data, orderingVal);
+ }
+
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, Comparable orderingVal) {
+ super(key, data, operation, orderingVal);
+ }
+
+ public HoodieSparkRecord(HoodieRecord<InternalRow> record) {
+ super(record);
+ }
+
+ public HoodieSparkRecord() {
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance() {
+ return new HoodieSparkRecord(this);
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+ return new HoodieSparkRecord(key, data, op, getOrderingValue());
+ }
+
+ @Override
+ public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+ return new HoodieSparkRecord(key, data, getOrderingValue());
+ }
+
+ @Override
+ public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ return getRecordKey();
+ }
+
+ @Override
+ public String getRecordKey(String keyFieldName) {
+ return getRecordKey();
+ }
+
+ @Override
+ public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
+ StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+ InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, readerStructType, (InternalRow) other.getData(), readerStructType, writerStructType);
+ return new HoodieSparkRecord(getKey(), mergeRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, targetStructType);
+ return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+ InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+ : HoodieInternalRowUtils.rewriteRecord(data, readerStructType, writeSchemaWithMetaFieldsStructType);
+ return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+ InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName)
+ : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName);
+ return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+ InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols);
+ return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+ InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols);
+ // TODO change mapper type
+ return mapper.apply((IndexedRecord) rewriteRow);
+ }
+
+ @Override
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+ StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+ InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, newStructType);
+ return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+ }
+
+ @Override
+ public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+ data.update(pos, newValue);
+ return this;
+ }
+
+ @Override
+ public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+ Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+ String value = metadataValues.get(metadataField);
+ if (value != null) {
+ data.update(recordSchema.getField(metadataField.getFieldName()).pos(), value);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public Option<Map<String, String>> getMetadata() {
+ return Option.empty();
+ }
+
+ @Override
+ public boolean isPresent(Schema schema, Properties prop) throws IOException {
+ if (null == data) {
+ return false;
+ }
+ Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+ return !(deleteMarker instanceof Boolean && (boolean) deleteMarker);
+ }
+
+ @Override
+ public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+ // TODO SENTINEL should refactor without Avro(GenericRecord)
+ if (null != data && data.equals(SENTINEL)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
new file mode 100644
index 0000000000..88ae7c13df
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HoodieSparkRecordMerge implements HoodieMerge {
+
+ @Override
+ public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+ if (older.getData() == null) {
+ // use natural order for delete record
+ return older;
+ }
+ if (older.getOrderingValue().compareTo(newer.getOrderingValue()) > 0) {
+ return older;
+ } else {
+ return newer;
+ }
+ }
+
+ @Override
+ public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+ return Option.of(newer);
+ }
+}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..7a08ee64bf
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {
+
+ private var sparkSession: SparkSession = _
+
+ private val schema1 = StructType(
+ Array(
+ StructField("name", StringType),
+ StructField("age", IntegerType)
+ )
+ )
+ private val schema2 = StructType(
+ Array(
+ StructField("name1", StringType),
+ StructField("age1", IntegerType)
+ )
+ )
+ private val schemaMerge = StructType(schema1.fields ++ schema2.fields)
+ private val schema1WithMetaData = StructType(Array(
+ StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType),
+ StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType)
+ ) ++ schema1.fields)
+
+ override protected def beforeAll(): Unit = {
+ // Initialize a local spark env
+ val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+ jsc.setLogLevel("ERROR")
+ sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+ }
+
+ override protected def afterAll(): Unit = {
+ sparkSession.close()
+ }
+
+ test("test merge") {
+ val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+ val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181)))
+ val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first()
+ val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first()
+ val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge)
+ assert(rowMerge.get(0, StringType).toString.equals("like"))
+ assert(rowMerge.get(1, IntegerType) == 18)
+ assert(rowMerge.get(2, StringType).toString.equals("like1"))
+ assert(rowMerge.get(3, IntegerType) == 181)
+ }
+
+ test("test rewrite") {
+ val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
+ val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
+ val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1)
+ val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2)
+ assert(newRow1.get(0, StringType).toString.equals("like"))
+ assert(newRow1.get(1, IntegerType) == 18)
+ assert(newRow2.get(0, StringType).toString.equals("like1"))
+ assert(newRow2.get(1, IntegerType) == 181)
+ }
+
+ test("test rewrite with nullable value") {
+ val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+ val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
+ val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge)
+ assert(newRow.get(0, StringType).toString.equals("like"))
+ assert(newRow.get(1, IntegerType) == 18)
+ assert(newRow.get(2, StringType) == null)
+ assert(newRow.get(3, IntegerType) == null)
+ }
+
+ test("test rewrite with metaDataFiled value") {
+ val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+ val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
+ val newRow = HoodieInternalRowUtils.rewriteRecordWithMetadata(oldRow, schema1, schema1WithMetaData, "file1")
+ assert(newRow.get(0, StringType) == null)
+ assert(newRow.get(1, StringType) == null)
+ assert(newRow.get(2, StringType) == null)
+ assert(newRow.get(3, StringType) == null)
+ assert(newRow.get(4, StringType).toString.equals("file1"))
+ assert(newRow.get(5, StringType) == null)
+ assert(newRow.get(6, BooleanType) == null)
+ assert(newRow.get(7, StringType).toString.equals("like"))
+ assert(newRow.get(8, IntegerType) == 18)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
new file mode 100644
index 0000000000..cb5529721c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.nio.ByteBuffer
+import java.util.{ArrayList, HashMap, Objects}
+import org.apache.avro.generic.GenericData
+import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.internal.schema.action.TableChanges
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll {
+ private var sparkSession: SparkSession = _
+
+ override protected def beforeAll(): Unit = {
+ // Initialize a local spark env
+ val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+ jsc.setLogLevel("ERROR")
+ sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+ }
+
+ override protected def afterAll(): Unit = {
+ sparkSession.close()
+ }
+
+ /**
+ * test record data type changes.
+ * int => long/float/double/string
+ * long => float/double/string
+ * float => double/String
+ * double => String/Decimal
+ * Decimal => Decimal/String
+ * String => date/decimal
+ * date => String
+ */
+ test("test rewrite record with type changed") {
+ val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+ + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ + "{\"name\":\"com1\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ + "{\"name\":\"col0\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ + "{\"name\":\"col11\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ + "{\"name\":\"col12\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+ + "{\"name\":\"col21\",\"type\":[\"null\",\"float\"],\"default\":null},"
+ + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+ + "{\"name\":\"col31\",\"type\":[\"null\",\"double\"],\"default\":null},"
+ + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+ + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+ + "{\"name\":\"col41\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col41\","
+ + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+ + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ + "{\"name\":\"col51\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+ + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+ + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+ + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}")
+ // create a test record with avroSchema
+ val avroRecord = new GenericData.Record(avroSchema)
+ avroRecord.put("id", 1)
+ avroRecord.put("comb", 100)
+ avroRecord.put("com1", -100)
+ avroRecord.put("col0", 256)
+ avroRecord.put("col1", 1000L)
+ avroRecord.put("col11", -100L)
+ avroRecord.put("col12", 2000L)
+ avroRecord.put("col2", -5.001f)
+ avroRecord.put("col21", 5.001f)
+ avroRecord.put("col3", 12.999d)
+ avroRecord.put("col31", 9999.999d)
+ val currentDecimalType = avroSchema.getField("col4").schema.getTypes.get(1)
+ val bd = new java.math.BigDecimal("123.456").setScale(currentDecimalType.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+ avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType))
+ val currentDecimalType1 = avroSchema.getField("col41").schema.getTypes.get(1)
+ val bd1 = new java.math.BigDecimal("7890.456").setScale(currentDecimalType1.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+ avroRecord.put("col41", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd1, currentDecimalType1, currentDecimalType1.getLogicalType))
+ avroRecord.put("col5", "2011-01-01")
+ avroRecord.put("col51", "199.342")
+ avroRecord.put("col6", 18987)
+ avroRecord.put("col7", 1640491505000000L)
+ avroRecord.put("col8", false)
+ val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53))
+ avroRecord.put("col9", bb)
+ assert(GenericData.get.validate(avroSchema, avroRecord))
+ val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
+ // do change type operation
+ val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
+ updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateCo [...]
+ val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange)
+ val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName)
+ val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String])
+ assert(GenericData.get.validate(newAvroSchema, newRecord))
+ // Convert avro to internalRow
+ val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema)
+ val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+ val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get
+ val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema)
+ .apply(newRecord).get
+ val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+ internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+ }
+
+ test("test rewrite nest record") {
+ val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()),
+ Types.Field.get(1, true, "data", Types.StringType.get()),
+ Types.Field.get(2, true, "preferences",
+ Types.RecordType.get(Types.Field.get(5, false, "feature1",
+ Types.BooleanType.get()), Types.Field.get(6, true, "feature2", Types.BooleanType.get()))),
+ Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get())),
+ Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, Types.StringType.get(),
+ Types.RecordType.get(Types.Field.get(10, false, "lat", Types.FloatType.get()), Types.Field.get(11, false, "long", Types.FloatType.get())), false))
+ )
+ val schema = AvroInternalSchemaConverter.convert(record, "test1")
+ val avroRecord = new GenericData.Record(schema)
+ GenericData.get.validate(schema, avroRecord)
+ avroRecord.put("id", 2)
+ avroRecord.put("data", "xs")
+ // fill record type
+ val preferencesRecord = new GenericData.Record(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"))
+ preferencesRecord.put("feature1", false)
+ preferencesRecord.put("feature2", true)
+ assert(GenericData.get.validate(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"), preferencesRecord))
+ avroRecord.put("preferences", preferencesRecord)
+ // fill mapType
+ val locations = new HashMap[String, GenericData.Record]
+ val mapSchema = AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations")
+ val locationsValue: GenericData.Record = new GenericData.Record(mapSchema)
+ locationsValue.put("lat", 1.2f)
+ locationsValue.put("long", 1.4f)
+ val locationsValue1: GenericData.Record = new GenericData.Record(mapSchema)
+ locationsValue1.put("lat", 2.2f)
+ locationsValue1.put("long", 2.4f)
+ locations.put("key1", locationsValue)
+ locations.put("key2", locationsValue1)
+ avroRecord.put("locations", locations)
+ val doubles = new ArrayList[Double]
+ doubles.add(2.0d)
+ doubles.add(3.0d)
+ avroRecord.put("doubles", doubles)
+ // do check
+ assert(GenericData.get.validate(schema, avroRecord))
+ // create newSchema
+ val newRecord = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get), Types.Field.get(1, true, "data", Types.StringType.get), Types.Field.get(2, true, "preferences", Types.RecordType.get(Types.Field.get(5, false, "feature1", Types.BooleanType.get), Types.Field.get(5, true, "featurex", Types.BooleanType.get), Types.Field.get(6, true, "feature2", Types.BooleanType.get))), Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get)), Types [...]
+ val newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName)
+ val newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String])
+ // test the correctly of rewrite
+ assert(GenericData.get.validate(newAvroSchema, newAvroRecord))
+ // Convert avro to internalRow
+ val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema)
+ val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+ val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get
+ val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get
+ val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+ internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+ }
+
+ private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = {
+ schema match {
+ case StructType(fields) =>
+ val expectedRow = expected.asInstanceOf[InternalRow]
+ val actualRow = actual.asInstanceOf[InternalRow]
+ fields.zipWithIndex.foreach { case (field, i) => internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, field.dataType), field.dataType) }
+ case ArrayType(elementType, _) =>
+ val expectedArray = expected.asInstanceOf[ArrayData].toSeq[Any](elementType)
+ val actualArray = actual.asInstanceOf[ArrayData].toSeq[Any](elementType)
+ if (expectedArray.size != actualArray.size) {
+ throw new AssertionError()
+ } else {
+ expectedArray.zip(actualArray).foreach { case (e1, e2) => internalRowCompare(e1, e2, elementType) }
+ }
+ case MapType(keyType, valueType, _) =>
+ val expectedKeyArray = expected.asInstanceOf[MapData].keyArray()
+ val expectedValueArray = expected.asInstanceOf[MapData].valueArray()
+ val actualKeyArray = actual.asInstanceOf[MapData].keyArray()
+ val actualValueArray = actual.asInstanceOf[MapData].valueArray()
+ internalRowCompare(expectedKeyArray, actualKeyArray, ArrayType(keyType))
+ internalRowCompare(expectedValueArray, actualValueArray, ArrayType(valueType))
+ case StringType => if (checkNull(expected, actual) || !expected.toString.equals(actual.toString)) {
+ throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+ }
+ // TODO Verify after 'https://github.com/apache/hudi/pull/5907' merge
+ case BinaryType => if (checkNull(expected, actual) || !expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]])) {
+ // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+ }
+ case _ => if (!Objects.equals(expected, actual)) {
+ // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+ }
+ }
+ }
+
+ private def checkNull(left: Any, right: Any): Boolean = {
+ (left == null && right != null) || (left == null && right != null)
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 736e416162..2f38a22768 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -273,6 +273,7 @@ public class DeltaSync implements Serializable {
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
+ .setMergeClassName(cfg.mergeClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
@@ -370,6 +371,7 @@ public class DeltaSync implements Serializable {
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
+ .setMergeClassName(cfg.mergeClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a22a3581ae..dcf1581216 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -270,6 +271,10 @@ public class HoodieDeltaStreamer implements Serializable {
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+ @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records."
+ + "Implement your own, if you want to implement specific record merge logic.")
+ public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
@Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema"
+ ".SchemaProvider to attach schemas to input & target table data, built in options: "
+ "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."