You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2022/10/15 01:38:10 UTC
[hudi] branch release-feature-rfc46 updated: Fix comment in RFC46 (#6745)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
new 19ed31f9be Fix comment in RFC46 (#6745)
19ed31f9be is described below
commit 19ed31f9bec718548fdb2475c7d0a2df3d6539cd
Author: komao <ma...@gmail.com>
AuthorDate: Sat Oct 15 09:38:02 2022 +0800
Fix comment in RFC46 (#6745)
* rename
* add MetadataValues in updateMetadataValues
* remove singleton in fileFactory
* add truncateRecordKey
* remove hoodieRecord#setData
* rename HoodieAvroRecord
* fix code style
* fix HoodieSparkRecordSerializer
* fix benchmark
* fix SparkRecordUtils
* instantiate HoodieWriteConfig on the fly
* add test
* fix HoodieSparkRecordSerializer. Replace Java's object serialization with kryo
* add broadcast
* fix comment
* remove unnecessary broadcast
* add unsafe check in spark record
* fix getRecordColumnValues
* remove spark.sql.parquet.writeLegacyFormat
* fix unsafe projection
* fix
* pass external schema
* update doc
* rename back to HoodieAvroRecord
* fix
* remove comparable wrapper
* fix comment
* fix comment
* fix comment
* fix comment
* simplify row copy
* fix ParquetReaderIterator
Co-authored-by: Shawy Geng <ge...@gmail.com>
Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 23 +--
.../org/apache/hudi/io/HoodieAppendHandle.java | 26 +--
.../org/apache/hudi/io/HoodieConcatHandle.java | 9 +-
.../org/apache/hudi/io/HoodieCreateHandle.java | 12 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 73 +++----
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 42 ++--
.../apache/hudi/io/HoodieSortedMergeHandle.java | 11 +-
.../io/HoodieSortedMergeHandleWithChangeLog.java | 6 +-
.../strategy/ClusteringExecutionStrategy.java | 2 +-
.../hudi/table/action/commit/BaseMergeHelper.java | 2 +-
.../hudi/table/action/commit/BaseWriteHelper.java | 10 +-
.../table/action/commit/HoodieMergeHelper.java | 10 +-
.../table/action/commit/HoodieWriteHelper.java | 7 +-
.../hudi/io/FlinkConcatAndReplaceHandle.java | 7 +-
.../java/org/apache/hudi/io/FlinkConcatHandle.java | 7 +-
.../FlinkMergeAndReplaceHandleWithChangeLog.java | 39 +++-
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 33 +++-
.../hudi/table/action/commit/FlinkMergeHelper.java | 10 +-
.../hudi/table/action/commit/FlinkWriteHelper.java | 6 +-
.../run/strategy/JavaExecutionStrategy.java | 2 +-
.../hudi/table/action/commit/JavaMergeHelper.java | 10 +-
.../hudi/table/action/commit/JavaWriteHelper.java | 6 +-
.../MultipleSparkJobExecutionStrategy.java | 12 +-
.../strategy/SingleSparkJobExecutionStrategy.java | 14 +-
.../hudi/commmon/model/HoodieSparkRecord.java | 219 +++++++++++++--------
.../bulkinsert/RDDConsistentBucketPartitioner.java | 8 +-
.../RDDCustomColumnsSortPartitioner.java | 2 +-
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 4 +-
.../io/storage/HoodieSparkFileReaderFactory.java | 11 --
.../io/storage/HoodieSparkFileWriterFactory.java | 9 -
.../hudi/io/storage/HoodieSparkParquetReader.java | 4 +-
.../bootstrap/ParquetBootstrapMetadataHandler.java | 14 +-
.../apache/hudi/util/HoodieSparkRecordUtils.java | 59 +-----
.../org/apache/hudi/HoodieInternalRowUtils.scala | 67 ++-----
.../spark/sql/hudi/SparkStructTypeSerializer.scala | 157 ---------------
.../TestHoodieClientOnCopyOnWriteStorage.java | 7 +-
.../hudi/execution/TestBoundedInMemoryQueue.java | 4 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 2 +-
.../hudi/common/model/HoodieAvroIndexedRecord.java | 38 ++--
.../apache/hudi/common/model/HoodieAvroRecord.java | 50 +++--
.../hudi/common/model/HoodieAvroRecordMerger.java | 36 ++--
.../hudi/common/model/HoodieEmptyRecord.java | 49 ++---
.../org/apache/hudi/common/model/HoodieRecord.java | 28 +--
.../model/HoodieRecordCompatibilityInterface.java | 11 +-
.../hudi/common/model/HoodieRecordMerger.java | 5 +-
.../apache/hudi/common/model/MetadataValues.java | 66 +++++++
.../hudi/common/table/HoodieTableConfig.java | 11 +-
.../table/log/AbstractHoodieLogRecordReader.java | 3 +
.../table/log/HoodieMergedLogRecordScanner.java | 20 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 2 +-
.../common/table/log/block/HoodieDataBlock.java | 2 +-
.../table/log/block/HoodieHFileDataBlock.java | 5 +-
.../org/apache/hudi/common/util/ConfigUtils.java | 12 ++
.../apache/hudi/common/util/HoodieRecordUtils.java | 2 +-
.../hudi/common/util/ParquetReaderIterator.java | 21 +-
.../hudi/common/util/SerializationUtils.java | 18 --
.../org/apache/hudi/common/util/StringUtils.java | 2 -
.../hudi/common/util/collection/FlatLists.java | 148 ++++++++++++++
.../io/storage/HoodieAvroFileReaderFactory.java | 11 --
.../hudi/io/storage/HoodieAvroFileWriter.java | 5 +-
.../io/storage/HoodieAvroFileWriterFactory.java | 11 --
.../hudi/io/storage/HoodieFileReaderFactory.java | 9 +-
.../hudi/io/storage/HoodieFileWriterFactory.java | 9 +-
.../apache/hudi/configuration/FlinkOptions.java | 6 +-
.../org/apache/hudi/sink/StreamWriteFunction.java | 6 +-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 5 +-
.../org/apache/hudi/table/format/FormatUtils.java | 2 +-
.../table/format/mor/MergeOnReadInputFormat.java | 7 +-
.../apache/hudi/hadoop/TestInputPathHandler.java | 4 +-
.../org/apache/hudi/HoodieSparkRecordMerger.java | 14 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 12 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 18 +-
.../scala/org/apache/hudi/LogFileIterator.scala | 42 ++--
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 7 +-
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 12 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 2 +
.../SparkFullBootstrapDataProviderBase.java | 9 +-
.../apache/hudi/TestHoodieInternalRowUtils.scala | 12 --
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +-
.../apache/hudi/functional/TestMORDataSource.scala | 15 +-
.../ReadAndWriteWithoutAvroBenchmark.scala | 75 ++++---
.../spark/sql/hudi/TestHoodieOptionConfig.scala | 7 +-
.../deltastreamer/HoodieDeltaStreamer.java | 5 +-
rfc/rfc-46/rfc-46.md | 4 +-
84 files changed, 927 insertions(+), 859 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a5931c2e0a..1d0a285912 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -139,8 +139,8 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
.key("hoodie.datasource.write.merger.strategy")
- .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
- .withDocumentation("Id of merger strategy. Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
+ .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ .withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
@@ -517,7 +517,6 @@ public class HoodieWriteConfig extends HoodieConfig {
private HoodieCommonConfig commonConfig;
private HoodieStorageConfig storageConfig;
private EngineType engineType;
- private HoodieRecordMerger recordMerger;
/**
* @deprecated Use {@link #TBL_NAME} and its methods instead
@@ -894,7 +893,6 @@ public class HoodieWriteConfig extends HoodieConfig {
super();
this.engineType = EngineType.SPARK;
this.clientSpecifiedViewStorageConfig = null;
- applyMergerClass();
}
protected HoodieWriteConfig(EngineType engineType, Properties props) {
@@ -902,7 +900,6 @@ public class HoodieWriteConfig extends HoodieConfig {
Properties newProps = new Properties();
newProps.putAll(props);
this.engineType = engineType;
- applyMergerClass();
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
@@ -914,15 +911,6 @@ public class HoodieWriteConfig extends HoodieConfig {
this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
}
- private void applyMergerClass() {
- List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
- .map(String::trim)
- .distinct()
- .collect(Collectors.toList());
- String mergerStrategy = getString(MERGER_STRATEGY);
- this.recordMerger = HoodieRecordUtils.generateRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
- }
-
public static HoodieWriteConfig.Builder newBuilder() {
return new Builder();
}
@@ -935,7 +923,12 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public HoodieRecordMerger getRecordMerger() {
- return recordMerger;
+ List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
+ .map(String::trim)
+ .distinct()
+ .collect(Collectors.toList());
+ String mergerStrategy = getString(MERGER_STRATEGY);
+ return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
}
public String getSchema() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 075d763261..564d63ba77 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -39,6 +38,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -253,21 +253,21 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
}
private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
- Map<String, String> metadataValues = new HashMap<>();
- String seqId =
- HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+ MetadataValues metadataValues = new MetadataValues();
if (config.populateMetaFields()) {
- metadataValues.put(HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), fileId);
- metadataValues.put(HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), partitionPath);
- metadataValues.put(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), hoodieRecord.getRecordKey());
- metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(), instantTime);
- metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(), seqId);
+ String seqId =
+ HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+ metadataValues.setFileName(fileId);
+ metadataValues.setPartitionPath(partitionPath);
+ metadataValues.setRecordKey(hoodieRecord.getRecordKey());
+ metadataValues.setCommitTime(instantTime);
+ metadataValues.setCommitSeqno(seqId);
}
if (config.allowOperationMetadataField()) {
- metadataValues.put(HoodieRecord.HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName(), hoodieRecord.getOperation().getName());
+ metadataValues.setOperation(hoodieRecord.getOperation().getName());
}
- return hoodieRecord.updateValues(schema, prop, metadataValues);
+ return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);
}
private void initNewStatus() {
@@ -380,7 +380,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
List<IndexedRecord> indexedRecords = new LinkedList<>();
for (HoodieRecord hoodieRecord : recordList) {
- indexedRecords.add(((HoodieAvroIndexedRecord) hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get()).getData());
+ indexedRecords.add(hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get().getData());
}
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
@@ -511,7 +511,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
record.seal();
}
// fetch the ordering val first in case the record was deflated.
- final Comparable<?> orderingVal = record.getOrderingValue(config.getProps());
+ final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, config.getProps());
Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index 4f6b0428b2..26374b32fc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -93,11 +93,11 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
*/
@Override
public void write(HoodieRecord oldRecord) {
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
- writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, config.getPayloadConfig().getProps(), true);
+ writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, config.getPayloadConfig().getProps(), true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -109,6 +109,7 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
@Override
protected void writeIncomingRecords() throws IOException {
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
if (needsUpdateLocation()) {
@@ -116,7 +117,7 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
record.seal();
}
- writeInsertRecord(record);
+ writeInsertRecord(record, schema);
}
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index d6269a4fc9..d3782e9f20 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -27,11 +27,11 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
@@ -143,7 +143,9 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
} else {
rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
}
- rewriteRecord = rewriteRecord.updateValues(writeSchemaWithMetaFields, config.getProps(), Collections.singletonMap(HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), path.getName()));
+ MetadataValues metadataValues = new MetadataValues();
+ metadataValues.setFileName(path.getName());
+ rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
} else {
@@ -185,11 +187,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
while (keyIterator.hasNext()) {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
- if (useWriterSchema) {
- write(record, tableSchemaWithMetaFields, config.getProps());
- } else {
- write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
- }
+ write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 0933d9f28b..28377918b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -27,16 +27,17 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
@@ -267,66 +268,64 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema) throws IOException {
boolean isDelete = false;
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
- if (combineRecordOp.isPresent()) {
+ if (combineRecordOpt.isPresent()) {
updatedRecordsWritten++;
- if (oldRecord.getData() != combineRecordOp.get().getData()) {
+ if (oldRecord.getData() != combineRecordOpt.get().getData()) {
// the incoming record is chosen
- isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+ isDelete = HoodieOperation.isDelete(newRecord.getOperation());
} else {
// the incoming record is dropped
return false;
}
}
- return writeRecord(hoodieRecord, combineRecordOp, schema, config.getPayloadConfig().getProps(), isDelete);
+ return writeRecord(newRecord, combineRecordOpt, writerSchema, config.getPayloadConfig().getProps(), isDelete);
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
// just skip the ignored record
- if (hoodieRecord.shouldIgnore(schema, config.getProps())) {
+ if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
- writeInsertRecord(hoodieRecord, Option.of(hoodieRecord), schema, config.getProps());
+ writeInsertRecord(newRecord, schema, config.getProps());
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties prop)
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, Properties prop)
throws IOException {
- if (writeRecord(hoodieRecord, insertRecord, schema, prop, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
+ if (writeRecord(newRecord, Option.of(newRecord), schema, prop, HoodieOperation.isDelete(newRecord.getOperation()))) {
insertRecordsWritten++;
}
}
- protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws IOException {
- return writeRecord(hoodieRecord, combineRecord, schema, prop, false);
+ protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws IOException {
+ return writeRecord(newRecord, combineRecord, schema, prop, false);
}
- protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
- Option recordMetadata = hoodieRecord.getMetadata();
- if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
+ protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
+ Option recordMetadata = newRecord.getMetadata();
+ if (!partitionPath.equals(newRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
- + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
- writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
+ + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
+ writeStatus.markFailure(newRecord, failureEx, recordMetadata);
return false;
}
try {
if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
- writeToFile(hoodieRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
+ writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
} else {
recordsDeleted++;
}
- writeStatus.markSuccess(hoodieRecord, recordMetadata);
+ writeStatus.markSuccess(newRecord, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
- hoodieRecord.deflate();
+ newRecord.deflate();
return true;
} catch (Exception e) {
- LOG.error("Error writing record " + hoodieRecord, e);
- writeStatus.markFailure(hoodieRecord, e, recordMetadata);
+ LOG.error("Error writing record " + newRecord, e);
+ writeStatus.markFailure(newRecord, e, recordMetadata);
}
return false;
}
@@ -335,21 +334,24 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
public void write(HoodieRecord<T> oldRecord) {
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+ Schema newSchema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
boolean copyOldRecord = true;
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
TypedProperties props = config.getPayloadConfig().getProps();
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
- HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
+ HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
try {
- Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord, hoodieRecord, schema, props);
+ Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
+ Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
+ Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
- if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
+ if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
- } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedRecord)) {
+ } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the combined new value
@@ -368,7 +370,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
if (copyOldRecord) {
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
- writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, props, true);
+ writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, props, true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -386,7 +388,9 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
} else {
rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
}
- rewriteRecord = rewriteRecord.updateValues(writeSchemaWithMetaFields, prop, Collections.singletonMap(HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), newFilePath.getName()));
+ MetadataValues metadataValues = new MetadataValues();
+ metadataValues.setFileName(newFilePath.getName());
+ rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
@@ -400,10 +404,11 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
while (newRecordsItr.hasNext()) {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
- writeInsertRecord(hoodieRecord);
+ writeInsertRecord(hoodieRecord, schema);
}
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 6e8fda0b10..fda1435345 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -30,10 +30,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.Iterator;
@@ -44,6 +46,9 @@ import java.util.Map;
* A merge handle that supports logging change logs.
*/
public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieMergeHandleWithChangeLog.class);
+
protected final HoodieCDCLogger cdcLogger;
public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -75,24 +80,35 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+ // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+ Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
- boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+ boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ Option<IndexedRecord> avroOpt = savedCombineRecordOp
+ .flatMap(r -> {
+ try {
+ return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+ .map(HoodieAvroIndexedRecord::getData);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+ return Option.empty();
+ }
+ });
+ cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
- // Get the data before deflated
- Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
- Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema, this.config.getProps())
- .map(HoodieRecord::getData);
- super.writeInsertRecord(hoodieRecord);
- if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
- cdcLogger.put(hoodieRecord, null, recordOption);
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+ // TODO Remove these unnecessary newInstance invocations
+ HoodieRecord<T> savedRecord = newRecord.newInstance();
+ super.writeInsertRecord(newRecord, schema);
+ if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+ cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 60c6a2da7f..a4d7b5efa5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -72,7 +73,9 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
*/
@Override
public void write(HoodieRecord oldRecord) {
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+ Schema newSchema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
// the oldRecord's key.
@@ -89,11 +92,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
- if (useWriterSchemaForCompaction) {
- writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchemaWithMetaFields, config.getProps());
- } else {
- writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchema, config.getProps());
- }
+ writeRecord(hoodieRecord, Option.of(hoodieRecord), newSchema, config.getProps());
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
} catch (IOException e) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 727765b3e2..819cfd0754 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -53,10 +53,10 @@ public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O> extends HoodieMerg
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
- protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties props)
+ protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties props)
throws IOException {
- final boolean result = super.writeRecord(hoodieRecord, insertRecord, schema, props);
- this.cdcLogger.put(hoodieRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+ final boolean result = super.writeRecord(newRecord, insertRecord, schema, props);
+ this.cdcLogger.put(newRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
return result;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d2de068838..4b7240d432 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -39,7 +39,7 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> implements Seriali
private final HoodieTable<T, I, K, O> hoodieTable;
private final transient HoodieEngineContext engineContext;
- private final HoodieWriteConfig writeConfig;
+ protected final HoodieWriteConfig writeConfig;
protected final HoodieRecordType recordType;
public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index a3a3188df9..4c5db03d35 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -83,7 +83,7 @@ public abstract class BaseMergeHelper<T, I, K, O> {
@Nonnull
private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) {
try {
- return left.mergeWith(right, targetSchema);
+ return left.joinWith(right, targetSchema);
} catch (IOException e) {
throw new HoodieIOException("Failed to merge records", e);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 0b96529379..622ed4573e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -87,6 +88,11 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}
- public abstract I deduplicateRecords(
- I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merge);
+ public I deduplicateRecords(
+ I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
+ return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger);
+ }
+
+ protected abstract I deduplicateRecordsInternal(
+ I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 49edb981a5..e4fb3face4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -43,7 +43,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -97,7 +96,7 @@ public class HoodieMergeHelper<T> extends
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
}
- BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
@@ -139,13 +138,14 @@ public class HoodieMergeHelper<T> extends
}
}
- wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
+ wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> {
+ HoodieRecord recordCopy = record.copy();
if (!externalSchemaTransformation) {
- return record;
+ return recordCopy;
}
try {
- return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+ return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
} catch (IOException e) {
throw new HoodieException(String.format("Failed to rewrite record. WriterSchema: %s; ReaderSchema: %s", writerSchema, readerSchema), e);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index bd495f69da..1893132e7d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -53,8 +53,8 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
}
@Override
- public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+ public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -68,8 +68,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
}).reduceByKey((rec1, rec2) -> {
HoodieRecord<T> reducedRecord;
try {
- // Precombine do not need schema and do not return null
- reducedRecord = recordMerger.merge(rec1, rec2, schema.get(), props).get();
+ reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft();
} catch (IOException e) {
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
index 662e8381e6..4faa7b5fa6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
@@ -56,7 +57,8 @@ public class FlinkConcatAndReplaceHandle<T, I, K, O>
*/
@Override
public void write(HoodieRecord oldRecord) {
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
try {
fileWriter.write(key, oldRecord, writeSchema);
} catch (IOException | RuntimeException e) {
@@ -70,9 +72,10 @@ public class FlinkConcatAndReplaceHandle<T, I, K, O>
@Override
protected void writeIncomingRecords() throws IOException {
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
- writeInsertRecord(record);
+ writeInsertRecord(record, schema);
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
index 8e4fb50e52..e23aa6e74a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -55,7 +56,8 @@ public class FlinkConcatHandle<T, I, K, O>
*/
@Override
public void write(HoodieRecord oldRecord) {
- String key = oldRecord.getRecordKey(keyGeneratorOpt);
+ Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+ String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
try {
fileWriter.write(key, oldRecord, writeSchema);
} catch (IOException | RuntimeException e) {
@@ -69,9 +71,10 @@ public class FlinkConcatHandle<T, I, K, O>
@Override
protected void writeIncomingRecords() throws IOException {
+ Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
- writeInsertRecord(record);
+ writeInsertRecord(record, schema);
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index fa14ba0418..7dde24ec8c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -28,7 +28,11 @@ import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
@@ -44,6 +48,9 @@ import java.util.List;
*/
public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
extends FlinkMergeAndReplaceHandle<T, I, K, O> {
+
+ private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandleWithChangeLog.class);
+
private final HoodieCDCLogger cdcLogger;
public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -59,20 +66,36 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+ // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+ Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
- boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+ boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ Option<IndexedRecord> avroOpt = savedCombineRecordOp
+ .flatMap(r -> {
+ try {
+ return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+ .map(HoodieAvroIndexedRecord::getData);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+ return Option.empty();
+ }
+ });
+ cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
- super.writeInsertRecord(hoodieRecord);
- if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
- cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+ // TODO Remove these unnecessary newInstance invocations
+ HoodieRecord<T> savedRecord = newRecord.newInstance();
+ super.writeInsertRecord(newRecord, schema);
+ if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+ cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
+ newRecord.deflate();
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index b85c7c270f..4c9a7484c1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -62,20 +64,35 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
- protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
throws IOException {
- final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+ // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+ Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+ HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+ final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
- boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
- cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+ boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+ Option<IndexedRecord> avroOpt = savedCombineRecordOp
+ .flatMap(r -> {
+ try {
+ return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+ .map(HoodieAvroIndexedRecord::getData);
+ } catch (IOException e) {
+ LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+ return Option.empty();
+ }
+ });
+ cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
}
return result;
}
- protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
- super.writeInsertRecord(hoodieRecord);
- if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
- cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+ // TODO Remove these unnecessary newInstance invocations
+ HoodieRecord<T> savedRecord = newRecord.newInstance();
+ super.writeInsertRecord(newRecord, schema);
+ if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+ cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index a15a438c7a..06886dbb07 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -32,7 +32,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
@@ -82,7 +81,7 @@ public class FlinkMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
}
- BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
try {
final Iterator<HoodieRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
@@ -91,13 +90,14 @@ public class FlinkMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>
readerIterator = reader.getRecordIterator(readSchema);
}
- wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
+ wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
+ HoodieRecord recordCopy = record.copy();
if (!externalSchemaTransformation) {
- return record;
+ return recordCopy;
}
try {
- return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+ return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
} catch (IOException e) {
throw new HoodieException(e);
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 4993c43608..db9422336d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -91,8 +91,8 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
}
@Override
- public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+ public List<HoodieRecord<T>> deduplicateRecordsInternal(
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
@@ -103,7 +103,7 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
HoodieRecord<T> reducedRecord;
try {
// Precombine do not need schema and do not return null
- reducedRecord = recordMerger.merge(rec1, rec2, null, props).get();
+ reducedRecord = merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
} catch (IOException e) {
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index fe398238da..e62643ad4e 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -216,7 +216,7 @@ public abstract class JavaExecutionStrategy<T>
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
- recordIterator.forEachRemaining(record -> records.add(record.wrapIntoHoodieRecordPayloadWithKeyGen(new Properties(), Option.empty())));
+ recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 9593b5e72f..54a044ae26 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -32,7 +32,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
@@ -80,7 +79,7 @@ public class JavaMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>>
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
}
- BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
try {
final Iterator<HoodieRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
@@ -89,13 +88,14 @@ public class JavaMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>>
readerIterator = reader.getRecordIterator(readSchema);
}
- wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
+ wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
+ HoodieRecord recordCopy = record.copy();
if (!externalSchemaTransformation) {
- return record;
+ return recordCopy;
}
try {
- return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+ return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
} catch (IOException e) {
throw new HoodieException(e);
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 7edbb55c3e..14016cb5c0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -58,8 +58,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
}
@Override
- public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+ public List<HoodieRecord<T>> deduplicateRecordsInternal(
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
@@ -72,7 +72,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
HoodieRecord<T> reducedRecord;
try {
- reducedRecord = recordMerger.merge(rec1, rec2, schema, props).get();
+ reducedRecord = merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
} catch (IOException e) {
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 4c73012d2c..64264a1a10 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -85,7 +85,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
@@ -330,14 +329,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
- Option<BaseKeyGenerator> keyGeneratorOp;
- if (!Boolean.parseBoolean(writeConfig.getProps().getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
- keyGeneratorOp = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
- } else {
- keyGeneratorOp = Option.empty();
- }
+ Option<BaseKeyGenerator> keyGeneratorOp =
+ writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
- rec -> ((HoodieRecord) rec).wrapIntoHoodieRecordPayloadWithKeyGen(writeConfig.getProps(), keyGeneratorOp));
+ rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
+ writeConfig.getProps(), keyGeneratorOp));
iteratorsForPartition.add(mappingIterator);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 918542dbef..765fa663f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -63,8 +63,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
-
/**
* Clustering strategy to submit single spark jobs.
* MultipleSparkJobExecution strategy is not ideal for use cases that require large number of clustering groups
@@ -150,16 +148,12 @@ public abstract class SingleSparkJobExecutionStrategy<T>
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
Iterable<HoodieRecord<T>> indexedRecords = () -> {
try {
-
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
- Option<BaseKeyGenerator> keyGeneratorOp;
- if (!Boolean.parseBoolean(getWriteConfig().getProps().getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
- keyGeneratorOp = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(getWriteConfig().getProps()));
- } else {
- keyGeneratorOp = Option.empty();
- }
+ Option<BaseKeyGenerator> keyGeneratorOp =
+ writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
- rec -> ((HoodieRecord) rec).wrapIntoHoodieRecordPayloadWithKeyGen(getWriteConfig().getProps(), keyGeneratorOp));
+ rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
+ getWriteConfig().getProps(), keyGeneratorOp));
return mappingIterator;
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index f866914b27..e111679842 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -24,8 +24,10 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
@@ -37,7 +39,7 @@ import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -47,6 +49,8 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -55,60 +59,77 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
*/
public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
- private StructType structType = null;
- private Option<Long> schemaFingerPrint = Option.empty();
+ /**
+ * Record copy operation to avoid double copying. InternalRow do not need to copy twice.
+ */
+ private boolean copy;
+ /**
+ * We should use this construction method when we read internalRow from file.
+ * The record constructed by this method must be used in iter.
+ */
public HoodieSparkRecord(InternalRow data, StructType schema) {
super(null, data);
- initSchema(schema);
+ this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, false);
+ this.copy = false;
}
public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
super(key, data);
- initSchema(schema);
+ this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+ this.copy = true;
}
public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) {
super(key, data, operation);
- initSchema(schema);
+ this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+ this.copy = true;
+ }
+
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
+ super(key, data, operation);
+ this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+ this.copy = copy;
}
public HoodieSparkRecord(HoodieSparkRecord record) {
super(record);
- initSchema(record.getStructType());
+ this.copy = record.copy;
}
@Override
- public HoodieRecord<InternalRow> newInstance() {
+ public HoodieSparkRecord newInstance() {
return new HoodieSparkRecord(this);
}
@Override
- public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
- return new HoodieSparkRecord(key, data, getStructType(), op);
+ public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
+ return new HoodieSparkRecord(key, data, null, op);
}
@Override
- public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
- return new HoodieSparkRecord(key, data, getStructType());
+ public HoodieSparkRecord newInstance(HoodieKey key) {
+ return new HoodieSparkRecord(key, data, null);
}
@Override
- public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt) {
if (key != null) {
return getRecordKey();
}
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
- .getRecordKey(data, getStructType()).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+ .getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
}
@Override
- public String getRecordKey(String keyFieldName) {
+ public String getRecordKey(Schema recordSchema, String keyFieldName) {
if (key != null) {
return getRecordKey();
}
- DataType dataType = getStructType().apply(keyFieldName).dataType();
- int pos = getStructType().fieldIndex(keyFieldName);
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ DataType dataType = structType.apply(keyFieldName).dataType();
+ int pos = structType.fieldIndex(keyFieldName);
return data.get(pos, dataType).toString();
}
@@ -118,76 +139,80 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
- return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, getStructType(), consistentLogicalTimestampEnabled);
+ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
- StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
- StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
- InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, getStructType(), (InternalRow) other.getData(), otherStructType, writerStructType);
- return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException {
+ StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+ return new HoodieSparkRecord(getKey(), mergeRow, targetStructType, getOperation(), copy);
}
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
- UTF8String[] metaFields = extractMetaField(targetStructType);
+ UTF8String[] metaFields = extractMetaField(structType, targetStructType);
if (metaFields.length == 0) {
throw new UnsupportedOperationException();
}
- InternalRow resultRow;
- if (extractMetaField(getStructType()).length == 0) {
- resultRow = new HoodieInternalRow(metaFields, data, false);
- } else {
- resultRow = new HoodieInternalRow(metaFields, data, true);
- }
-
- return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation());
+ boolean containMetaFields = hasMetaField(structType);
+ InternalRow resultRow = new HoodieInternalRow(metaFields, data, containMetaFields);
+ return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation(), copy);
}
@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
- InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, getStructType(), newStructType, renameCols);
- UnsafeProjection unsafeConvert = HoodieInternalRowUtils.getCachedUnsafeConvert(newStructType);
- InternalRow resultRow = unsafeConvert.apply(rewriteRow);
- UTF8String[] metaFields = extractMetaField(newStructType);
+ InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+ UTF8String[] metaFields = extractMetaField(structType, newStructType);
if (metaFields.length > 0) {
- resultRow = new HoodieInternalRow(metaFields, data, true);
+ rewriteRow = new HoodieInternalRow(metaFields, data, true);
}
- return new HoodieSparkRecord(getKey(), resultRow, newStructType, getOperation());
+ return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation(), copy);
}
@Override
- public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
- metadataValues.forEach((key, value) -> {
- int pos = getStructType().fieldIndex(key);
+ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ metadataValues.getKv().forEach((key, value) -> {
+ int pos = structType.fieldIndex(key);
if (value != null) {
data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
}
});
- return new HoodieSparkRecord(getKey(), data, getStructType(), getOperation());
+ return new HoodieSparkRecord(getKey(), data, structType, getOperation(), copy);
+ }
+
+ @Override
+ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ int pos = structType.fieldIndex(keyFieldName);
+ data.update(pos, CatalystTypeConverters.convertToCatalyst(StringUtils.EMPTY_STRING));
+ return this;
}
@Override
- public boolean isDelete(Schema schema, Properties props) throws IOException {
+ public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
if (null == data) {
return true;
}
- if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+ if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
return false;
}
- Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+ Object deleteMarker = data.get(recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
return deleteMarker instanceof Boolean && (boolean) deleteMarker;
}
@Override
- public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+ public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
if (data != null && data.equals(SENTINEL)) {
return true;
} else {
@@ -197,34 +222,36 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
@Override
public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
- Schema schema, Properties props,
+ Schema recordSchema, Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
Option<String> partitionNameOp,
Boolean populateMetaFields) {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
if (populateMetaFields) {
- return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, withOperation);
+ return convertToHoodieSparkRecord(structType, this, withOperation);
} else if (simpleKeyGenFieldsOpt.isPresent()) {
- return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
+ return convertToHoodieSparkRecord(structType, this, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
} else {
- return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, withOperation, partitionNameOp);
+ return convertToHoodieSparkRecord(structType, this, withOperation, partitionNameOp);
}
}
@Override
- public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String key;
String partition;
if (keyGen.isPresent() && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface) keyGen.get();
- key = keyGenerator.getRecordKey(data, getStructType()).toString();
- partition = keyGenerator.getPartitionPath(data, getStructType()).toString();
+ key = keyGenerator.getRecordKey(data, structType).toString();
+ partition = keyGenerator.getPartitionPath(data, structType).toString();
} else {
key = data.get(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal(), StringType).toString();
partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
- return new HoodieSparkRecord(hoodieKey, data, getStructType(), getOperation());
+ return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
}
@Override
@@ -233,51 +260,75 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException {
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties prop) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public Comparable<?> getOrderingValue(Properties props) {
+ public HoodieSparkRecord copy() {
+ if (!copy) {
+ this.data = this.data.copy();
+ copy = true;
+ }
+ return this;
+ }
+
+ @Override
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
- if (!HoodieCatalystExpressionUtils$.MODULE$.existField(getStructType(), orderingField)) {
+ if (!HoodieCatalystExpressionUtils$.MODULE$.existField(structType, orderingField)) {
return 0;
} else {
- NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(getStructType(),
- orderingField);
- Comparable<?> value = (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(
- data, nestedFieldPath);
+ NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+ Comparable<?> value = (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
return value;
}
}
- public StructType getStructType() {
- if (schemaFingerPrint.isPresent()) {
- return HoodieInternalRowUtils.getCachedSchemaFromFingerPrint(schemaFingerPrint.get());
- } else {
- return structType;
- }
+ private UTF8String[] extractMetaField(StructType recordStructType, StructType structTypeWithMetaField) {
+ return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
+ .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structTypeWithMetaField, f))
+ .map(field -> {
+ if (HoodieCatalystExpressionUtils$.MODULE$.existField(recordStructType, field)) {
+ return data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field));
+ } else {
+ return UTF8String.EMPTY_UTF8;
+ }
+ }).toArray(UTF8String[]::new);
}
- private void initSchema(StructType structType) {
- if (HoodieInternalRowUtils.containsCompressedSchema(structType)) {
- HoodieInternalRowUtils.addCompressedSchema(structType);
- this.schemaFingerPrint = Option.of(HoodieInternalRowUtils.getCachedFingerPrintFromSchema(structType));
- } else {
- this.structType = structType;
- }
+ private static boolean hasMetaField(StructType structType) {
+ return HoodieCatalystExpressionUtils$.MODULE$.existField(structType, COMMIT_TIME_METADATA_FIELD);
}
- public void setStructType(StructType structType) {
- if (structType != null) {
- initSchema(structType);
- }
+ /**
+ * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+ */
+ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, boolean withOperationField) {
+ return convertToHoodieSparkRecord(structType, record,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, Option.empty());
}
- private UTF8String[] extractMetaField(StructType structType) {
- return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
- .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structType, f))
- .map(UTF8String::fromString)
- .toArray(UTF8String[]::new);
+ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, boolean withOperationField,
+ Option<String> partitionName) {
+ return convertToHoodieSparkRecord(structType, record,
+ Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ withOperationField, partitionName);
+ }
+
+ /**
+ * Utility method to convert bytes to HoodieRecord using schema and payload class.
+ */
+ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, Pair<String, String> recordKeyPartitionPathFieldPair,
+ boolean withOperationField, Option<String> partitionName) {
+ final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), record.data).toString();
+ final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
+ getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
+
+ HoodieOperation operation = withOperationField
+ ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+ return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
index 75be1402ea..0735527ae7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.FlatLists;
+import org.apache.hudi.common.util.collection.FlatLists.ComparableList;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.io.AppendHandleFactory;
@@ -234,9 +236,9 @@ public class RDDConsistentBucketPartitioner<T> extends RDDBucketIndexPartitioner
final String[] sortColumns = sortColumnNames;
final SerializableSchema schema = new SerializableSchema(HoodieAvroUtils.addMetadataFields((new Schema.Parser().parse(table.getConfig().getSchema()))));
Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & Serializable) (t1, t2) -> {
- Object obj1 = t1.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
- Object obj2 = t2.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
- return ((Comparable) obj1).compareTo(obj2);
+ ComparableList obj1 = FlatLists.ofComparableArray(t1.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled));
+ ComparableList obj2 = FlatLists.ofComparableArray(t2.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled));
+ return obj1.compareTo(obj2);
};
return records.mapToPair(record -> new Tuple2<>(record, record))
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index dbbcb22e90..e723d724b6 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -61,7 +61,7 @@ public class RDDCustomColumnsSortPartitioner<T>
final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
return records.sortBy(
record -> {
- Object recordValue = record.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
+ Object recordValue = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
// null values are replaced with empty string for null_first order
if (recordValue == null) {
return StringUtils.EMPTY_STRING;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 5ba5c5dfdf..0a042d3362 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -95,7 +94,6 @@ public class RDDSpatialCurveSortPartitioner<T>
});
} else if (recordType == HoodieRecordType.SPARK) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get());
- Broadcast<StructType> structTypeBC = sparkEngineContext.getJavaSparkContext().broadcast(structType);
Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType);
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
@@ -107,7 +105,7 @@ public class RDDSpatialCurveSortPartitioner<T>
String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
HoodieKey hoodieKey = new HoodieKey(key, partition);
- HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structTypeBC.value());
+ HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structType);
return hoodieRecord;
});
} else {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index f0d650ecdc..112981f902 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -27,17 +27,6 @@ import org.apache.spark.sql.internal.SQLConf;
public class HoodieSparkFileReaderFactory extends HoodieFileReaderFactory {
- private static class SingletonHolder {
- private static final HoodieSparkFileReaderFactory INSTANCE = new HoodieSparkFileReaderFactory();
- }
-
- private HoodieSparkFileReaderFactory() {
- }
-
- public static HoodieFileReaderFactory getFileReaderFactory() {
- return SingletonHolder.INSTANCE;
- }
-
protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index f746bb0e8e..a4211656b0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -39,15 +39,6 @@ import java.io.IOException;
public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
- private static class SingletonHolder {
-
- private static final HoodieSparkFileWriterFactory INSTANCE = new HoodieSparkFileWriterFactory();
- }
-
- public static HoodieFileWriterFactory getFileWriterFactory() {
- return HoodieSparkFileWriterFactory.SingletonHolder.INSTANCE;
- }
-
@Override
protected HoodieFileWriter newParquetFileWriter(
String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 7078393d28..a46174cbae 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -86,13 +86,13 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
- ParquetReader reader = new ParquetReader.Builder<InternalRow>(inputFile) {
+ ParquetReader<InternalRow> reader = new ParquetReader.Builder<InternalRow>(inputFile) {
@Override
protected ReadSupport getReadSupport() {
return new ParquetReadSupport();
}
}.withConf(conf).build();
- ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader, InternalRow::copy);
+ ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index b47a1c765d..3828f63564 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -22,7 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,7 +44,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
-import java.util.Collections;
import java.util.Properties;
class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
@@ -70,12 +69,15 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
.getFileReader(table.getHadoopConf(), sourceFilePath);
try {
wrapper = new BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
- reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
+ reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), record -> {
try {
- String recKey = inp.getRecordKey(Option.of(keyGenerator));
- HoodieRecord hoodieRecord = inp.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
+ HoodieRecord recordCopy = record.copy();
+ String recKey = recordCopy.getRecordKey(reader.getSchema(), Option.of(keyGenerator));
+ HoodieRecord hoodieRecord = recordCopy.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
+ MetadataValues metadataValues = new MetadataValues();
+ metadataValues.setRecordKey(recKey);
return hoodieRecord
- .updateValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), Collections.singletonMap(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), recKey))
+ .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues)
.newInstance(new HoodieKey(recKey, partitionPath));
} catch (IOException e) {
LOG.error("Unable to overrideMetadataFieldValue", e);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
index 4779e8e05f..65ac51a1c7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
@@ -19,13 +19,8 @@
package org.apache.hudi.util;
import org.apache.hudi.HoodieInternalRowUtils;
-import org.apache.hudi.commmon.model.HoodieSparkRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
@@ -35,37 +30,7 @@ import org.apache.spark.sql.types.StructType;
public class HoodieSparkRecordUtils {
- /**
- * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
- */
- public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) {
- return convertToHoodieSparkRecord(structType, data,
- Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
- withOperationField, Option.empty());
- }
-
- public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField,
- Option<String> partitionName) {
- return convertToHoodieSparkRecord(structType, data,
- Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
- withOperationField, partitionName);
- }
-
- /**
- * Utility method to convert bytes to HoodieRecord using schema and payload class.
- */
- public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair<String, String> recordKeyPartitionPathFieldPair,
- boolean withOperationField, Option<String> partitionName) {
- final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();
- final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
- getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString());
-
- HoodieOperation operation = withOperationField
- ? HoodieOperation.fromName(getNullableValAsString(structType, data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
- return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, structType, operation);
- }
-
- private static Object getValue(StructType structType, String fieldName, InternalRow row) {
+ public static Object getValue(StructType structType, String fieldName, InternalRow row) {
NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
}
@@ -77,7 +42,7 @@ public class HoodieSparkRecordUtils {
* @param fieldName The field name
* @return the string form of the field or empty if the schema does not contain the field name or the value is null
*/
- private static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
+ public static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
String fieldVal = !HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName)
? null : StringUtils.objToString(getValue(structType, fieldName, row));
return Option.ofNullable(fieldVal);
@@ -91,21 +56,15 @@ public class HoodieSparkRecordUtils {
* @param structType {@link StructType} instance.
* @return Column value if a single column, or concatenated String values by comma.
*/
- public static Object getRecordColumnValues(InternalRow row,
+ public static Object[] getRecordColumnValues(InternalRow row,
String[] columns,
StructType structType, boolean consistentLogicalTimestampEnabled) {
- if (columns.length == 1) {
- NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
- } else {
- // TODO this is inefficient, instead we can simply return array of Comparable
- StringBuilder sb = new StringBuilder();
- for (String col : columns) {
- // TODO support consistentLogicalTimestampEnabled
- NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
- }
- return sb.toString();
+ Object[] objects = new Object[columns.length];
+ for (int i = 0; i < objects.length; i++) {
+ NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[i]);
+ Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+ objects[i] = value;
}
+ return objects;
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
index cd259974d9..169ccd61c3 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
@@ -21,16 +21,15 @@ package org.apache.hudi
import java.nio.charset.StandardCharsets
import java.util.HashMap
import java.util.concurrent.ConcurrentHashMap
-import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.Schema
import org.apache.hbase.thirdparty.com.google.common.base.Supplier
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
+import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieCatalystExpressionUtils
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, Projection}
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieUnsafeRowUtils}
import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.HoodieUnsafeRowUtils
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types._
@@ -43,25 +42,9 @@ object HoodieInternalRowUtils {
ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
})
- val unsafeConvertThreadLocal: ThreadLocal[HashMap[StructType, UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[HashMap[StructType, UnsafeProjection]] {
- override def get(): HashMap[StructType, UnsafeProjection] = new HashMap[StructType, UnsafeProjection]
- })
val schemaMap = new ConcurrentHashMap[Schema, StructType]
- val schemaFingerPrintMap = new ConcurrentHashMap[Long, StructType]
- val fingerPrintSchemaMap = new ConcurrentHashMap[StructType, Long]
val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
- /**
- * @see org.apache.hudi.avro.HoodieAvroUtils#stitchRecords(org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
- */
- def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
- val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
- val row = new JoinedRow(left, right)
- val projection = getCachedUnsafeProjection(mergeSchema, stitchedSchema)
- projection(row)
- }
-
/**
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
*/
@@ -218,16 +201,7 @@ object HoodieInternalRowUtils {
orderPosListMap.get(schemaPair)
}
- def getCachedUnsafeConvert(structType: StructType): UnsafeProjection = {
- val map = unsafeConvertThreadLocal.get()
- if (!map.containsKey(structType)) {
- val projection = UnsafeProjection.create(structType)
- map.put(structType, projection)
- }
- map.get(structType)
- }
-
- def getCachedUnsafeProjection(from: StructType, to: StructType): Projection = {
+ def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
val schemaPair = (from, to)
val map = unsafeProjectionThreadLocal.get()
if (!map.containsKey(schemaPair)) {
@@ -245,33 +219,16 @@ object HoodieInternalRowUtils {
schemaMap.get(schema)
}
- def getCachedSchemaFromFingerPrint(fingerPrint: Long): StructType = {
- if (!schemaFingerPrintMap.containsKey(fingerPrint)) {
- throw new IllegalArgumentException("Not exist " + fingerPrint)
- }
- schemaFingerPrintMap.get(fingerPrint)
- }
-
- def getCachedFingerPrintFromSchema(schema: StructType): Long = {
- if (!fingerPrintSchemaMap.containsKey(schema)) {
- throw new IllegalArgumentException("Not exist " + schema)
- }
- fingerPrintSchemaMap.get(schema)
- }
-
- def addCompressedSchema(schema: StructType): Unit ={
- if (!fingerPrintSchemaMap.containsKey(schema)) {
- val fingerPrint = SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8))
- schemaFingerPrintMap.put(fingerPrint, schema)
- fingerPrintSchemaMap.put(schema, fingerPrint)
+ def projectUnsafe(row: InternalRow, structType: StructType, copy: Boolean = true): InternalRow = {
+ if (row == null || row.isInstanceOf[UnsafeRow] || row.isInstanceOf[HoodieInternalRow]) {
+ row
+ } else {
+ val unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType).apply(row)
+ if (copy) unsafeRow.copy() else unsafeRow
}
}
- def containsCompressedSchema(schema: StructType): Boolean = {
- fingerPrintSchemaMap.containsKey(schema)
- }
-
- private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+ private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
oldSchema match {
case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType =>
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala
deleted file mode 100644
index e3407d19b6..0000000000
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hudi
-
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.{Input, Output}
-import com.twitter.chill.KSerializer
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import org.apache.avro.SchemaNormalization
-import org.apache.commons.io.IOUtils
-import org.apache.hudi.commmon.model.HoodieSparkRecord
-import org.apache.spark.io.CompressionCodec
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkEnv, SparkException}
-import scala.collection.mutable
-
-/**
- * Custom serializer used for generic spark records. If the user registers the schemas
- * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
- * schema, as to reduce network IO.
- * Actions like parsing or compressing schemas are computationally expensive so the serializer
- * caches all previously seen values as to reduce the amount of work needed to do.
- * @param schemas a map where the keys are unique IDs for spark schemas and the values are the
- * string representation of the Avro schema, used to decrease the amount of data
- * that needs to be serialized.
- */
-class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] {
- /** Used to reduce the amount of effort to compress the schema */
- private val compressCache = new mutable.HashMap[StructType, Array[Byte]]()
- private val decompressCache = new mutable.HashMap[ByteBuffer, StructType]()
-
- /** Fingerprinting is very expensive so this alleviates most of the work */
- private val fingerprintCache = new mutable.HashMap[StructType, Long]()
- private val schemaCache = new mutable.HashMap[Long, StructType]()
-
- // GenericAvroSerializer can't take a SparkConf in the constructor b/c then it would become
- // a member of KryoSerializer, which would make KryoSerializer not Serializable. We make
- // the codec lazy here just b/c in some unit tests, we use a KryoSerializer w/out having
- // the SparkEnv set (note those tests would fail if they tried to serialize avro data).
- private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
-
- /**
- * Used to compress Schemas when they are being sent over the wire.
- * The compression results are memoized to reduce the compression time since the
- * same schema is compressed many times over
- */
- def compress(schema: StructType): Array[Byte] = compressCache.getOrElseUpdate(schema, {
- val bos = new ByteArrayOutputStream()
- val out = codec.compressedOutputStream(bos)
- Utils.tryWithSafeFinally {
- out.write(schema.json.getBytes(StandardCharsets.UTF_8))
- } {
- out.close()
- }
- bos.toByteArray
- })
-
- /**
- * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
- * seen values so to limit the number of times that decompression has to be done.
- */
- def decompress(schemaBytes: ByteBuffer): StructType = decompressCache.getOrElseUpdate(schemaBytes, {
- val bis = new ByteArrayInputStream(
- schemaBytes.array(),
- schemaBytes.arrayOffset() + schemaBytes.position(),
- schemaBytes.remaining())
- val in = codec.compressedInputStream(bis)
- val bytes = Utils.tryWithSafeFinally {
- IOUtils.toByteArray(in)
- } {
- in.close()
- }
- StructType.fromString(new String(bytes, StandardCharsets.UTF_8))
- })
-
- /**
- * Serializes a record to the given output stream. It caches a lot of the internal data as
- * to not redo work
- */
- def serializeDatum(datum: HoodieSparkRecord, output: Output): Unit = {
- val schema = datum.getStructType
- val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
- SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8))
- })
- schemas.get(fingerprint) match {
- case Some(_) =>
- output.writeBoolean(true)
- output.writeLong(fingerprint)
- case None =>
- output.writeBoolean(false)
- val compressedSchema = compress(schema)
- output.writeInt(compressedSchema.length)
- output.writeBytes(compressedSchema)
- }
-
- val record = datum.newInstance().asInstanceOf[HoodieSparkRecord]
- record.setStructType(null)
- val stream = new ObjectOutputStream(output)
- stream.writeObject(record)
- stream.close()
- }
-
- /**
- * Deserializes generic records into their in-memory form. There is internal
- * state to keep a cache of already seen schemas and datum readers.
- */
- def deserializeDatum(input: Input): HoodieSparkRecord = {
- val schema = {
- if (input.readBoolean()) {
- val fingerprint = input.readLong()
- schemaCache.getOrElseUpdate(fingerprint, {
- schemas.get(fingerprint) match {
- case Some(s) => s
- case None =>
- throw new SparkException(
- "Error reading attempting to read spark data -- encountered an unknown " +
- s"fingerprint: $fingerprint, not sure what schema to use. This could happen " +
- "if you registered additional schemas after starting your spark context.")
- }
- })
- } else {
- val length = input.readInt()
- decompress(ByteBuffer.wrap(input.readBytes(length)))
- }
- }
- val stream = new ObjectInputStream(input)
- val record = stream.readObject().asInstanceOf[HoodieSparkRecord]
- stream.close()
- record.setStructType(schema)
-
- record
- }
-
- override def write(kryo: Kryo, output: Output, datum: HoodieSparkRecord): Unit =
- serializeDatum(datum, output)
-
- override def read(kryo: Kryo, input: Input, datumClass: Class[HoodieSparkRecord]): HoodieSparkRecord =
- deserializeDatum(input)
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index c0a0307c3e..8ae4385500 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -474,7 +474,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieRecordMerger recordMerger = HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
int dedupParallelism = records.getNumPartitions() + 100;
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
- HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+ .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(1, dedupedRecs.size());
@@ -484,7 +485,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
- dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ dedupedRecsRdd =
+ (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+ .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
dedupedRecs = dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(2, dedupedRecs.size());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 9fb1862c5f..a4f723ff01 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -269,8 +269,8 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>();
// queue memory limit
HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
- getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
- final long objSize = sizeEstimator.sizeEstimate(new Tuple2<>(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
+ getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
+ final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
final long memoryLimitInBytes = 4 * objSize;
// first let us throw exception from queueIterator reader and test that queueing thread
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2af3f8f3a1..9496a8083f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -115,7 +115,7 @@ public class HoodieAvroUtils {
* TODO serialize other type of record.
*/
public static Option<byte[]> recordToBytes(HoodieRecord record, Schema schema) throws IOException {
- return Option.of(HoodieAvroUtils.indexedRecordToBytes(((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, new Properties()).get()).getData()));
+ return Option.of(HoodieAvroUtils.indexedRecordToBytes(record.toIndexedRecord(schema, new Properties()).get().getData()));
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 28265bbbf4..913bbcb97c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -75,7 +76,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt) {
return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey((GenericRecord) data) : ((GenericRecord) data).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
}
@@ -85,19 +86,20 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public String getRecordKey(String keyFieldName) {
+ public String getRecordKey(Schema recordSchema, String keyFieldName) {
return Option.ofNullable(data.getSchema().getField(keyFieldName))
.map(keyField -> data.get(keyField.pos()))
.map(Object::toString).orElse(null);
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
throw new UnsupportedOperationException();
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other,
+ Schema targetSchema) throws IOException {
GenericRecord record = HoodieAvroUtils.stitchRecords((GenericRecord) data, (GenericRecord) other.getData(), targetSchema);
return new HoodieAvroIndexedRecord(record);
}
@@ -115,8 +117,8 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
- metadataValues.forEach((key, value) -> {
+ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
+ metadataValues.getKv().forEach((key, value) -> {
if (value != null) {
((GenericRecord) data).put(key, value);
}
@@ -126,18 +128,29 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public boolean isDelete(Schema schema, Properties props) {
+ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+ ((GenericRecord) data).put(keyFieldName, StringUtils.EMPTY_STRING);
+ return this;
+ }
+
+ @Override
+ public boolean isDelete(Schema recordSchema, Properties props) {
return false;
}
@Override
- public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+ public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
return getData().equals(SENTINEL);
}
+ @Override
+ public HoodieRecord<IndexedRecord> copy() {
+ return this;
+ }
+
@Override
public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
- Schema schema,
+ Schema recordSchema,
Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
@@ -149,7 +162,8 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+ Properties props, Option<BaseKeyGenerator> keyGen) {
GenericRecord record = (GenericRecord) data;
String key;
String partition;
@@ -174,7 +188,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public Comparable<?> getOrderingValue(Properties props) {
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(props.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
@@ -184,7 +198,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) {
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) {
return Option.of(this);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 65f15ca6a4..dfd2b4ba33 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -74,17 +75,18 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public Comparable<?> getOrderingValue(Properties props) {
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
return this.getData().getOrderingValue();
}
@Override
- public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public String getRecordKey(Schema recordSchema,
+ Option<BaseKeyGenerator> keyGeneratorOpt) {
return getRecordKey();
}
@Override
- public String getRecordKey(String keyFieldName) {
+ public String getRecordKey(Schema recordSchema, String keyFieldName) {
return getRecordKey();
}
@@ -94,12 +96,13 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
- return HoodieAvroUtils.getRecordColumnValues(this, columns, recordSchema, consistentLogicalTimestampEnabled);
+ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+ return new Object[]{HoodieAvroUtils.getRecordColumnValues(this, columns, recordSchema, consistentLogicalTimestampEnabled)};
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other,
+ Schema targetSchema) throws IOException {
throw new UnsupportedOperationException();
}
@@ -119,10 +122,10 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
+ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
- metadataValues.forEach((key, value) -> {
+ metadataValues.getKv().forEach((key, value) -> {
if (value != null) {
avroRecordPayload.put(key, value);
}
@@ -132,13 +135,20 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public boolean isDelete(Schema schema, Properties props) throws IOException {
- return !getData().getInsertValue(schema, props).isPresent();
+ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException {
+ GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
+ avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING);
+ return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation());
+ }
+
+ @Override
+ public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
+ return !getData().getInsertValue(recordSchema, props).isPresent();
}
@Override
- public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
- Option<IndexedRecord> insertRecord = getData().getInsertValue(schema, props);
+ public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
+ Option<IndexedRecord> insertRecord = getData().getInsertValue(recordSchema, props);
// just skip the ignored record
if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) {
return true;
@@ -147,21 +157,27 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
}
+ @Override
+ public HoodieRecord<T> copy() {
+ return this;
+ }
+
@Override
public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
- Schema schema, Properties props,
+ Schema recordSchema, Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
Option<String> partitionNameOp,
Boolean populateMetaFields) throws IOException {
- IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(schema, props).get();
+ IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get();
String payloadClass = ConfigUtils.getPayloadClass(props);
String preCombineField = ConfigUtils.getOrderingField(props);
return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields);
}
@Override
- public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+ Properties props, Option<BaseKeyGenerator> keyGen) {
throw new UnsupportedOperationException();
}
@@ -170,8 +186,8 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException {
- Option<IndexedRecord> avroData = getData().getInsertValue(schema, props);
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException {
+ Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema, props);
if (avroData.isPresent()) {
return Option.of(new HoodieAvroIndexedRecord(avroData.get()));
} else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index 9fa70a3719..e57a18b592 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -23,8 +23,8 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import java.io.IOException;
@@ -34,21 +34,28 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
public class HoodieAvroRecordMerger implements HoodieRecordMerger {
+ public static String DE_DUPING = "de_duping";
+
@Override
public String getMergingStrategy() {
- return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+ return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
@Override
- public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
- if (older instanceof HoodieAvroRecord && newer instanceof HoodieAvroRecord) {
- return Option.of(preCombine(older, newer));
- } else if (older instanceof HoodieAvroIndexedRecord && newer instanceof HoodieAvroRecord) {
- return combineAndGetUpdateValue(older, newer, schema, props);
+ boolean deDuping = Boolean.parseBoolean(props.getOrDefault(DE_DUPING, "false").toString());
+ if (deDuping) {
+ HoodieRecord res = preCombine(older, newer);
+ if (res == older) {
+ return Option.of(Pair.of(res, oldSchema));
+ } else {
+ return Option.of(Pair.of(res, newSchema));
+ }
} else {
- throw new UnsupportedOperationException();
+ return combineAndGetUpdateValue(older, newer, newSchema, props)
+ .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
}
}
@@ -67,12 +74,19 @@ public class HoodieAvroRecordMerger implements HoodieRecordMerger {
}
private Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
- Option<HoodieAvroIndexedRecord> previousRecordAvroPayload = older.toIndexedRecord(schema, props);
- if (!previousRecordAvroPayload.isPresent()) {
+ Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData);
+ if (!previousAvroData.isPresent()) {
return Option.empty();
}
- return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get().getData(), schema, props)
+ return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props)
.map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
}
+
+ public static Properties withDeDuping(Properties props) {
+ Properties newProps = new Properties();
+ newProps.putAll(props);
+ newProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
+ return newProps;
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index f2167bb15e..74954a5e63 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -45,24 +45,13 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
this.orderingVal = orderingVal;
}
- public HoodieEmptyRecord(HoodieRecord<T> record, HoodieRecordType type) {
- super(record);
- this.type = type;
- this.orderingVal = record.getOrderingValue(new Properties());
- }
-
- public HoodieEmptyRecord(HoodieRecordType type) {
- this.type = type;
- this.orderingVal = null;
- }
-
@Override
public T getData() {
return null;
}
@Override
- public Comparable<?> getOrderingValue(Properties props) {
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
return orderingVal;
}
@@ -87,22 +76,24 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public String getRecordKey(Schema recordSchema,
+ Option<BaseKeyGenerator> keyGeneratorOpt) {
return key.getRecordKey();
}
@Override
- public String getRecordKey(String keyFieldName) {
+ public String getRecordKey(Schema recordSchema, String keyFieldName) {
return key.getRecordKey();
}
@Override
- public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
throw new UnsupportedOperationException();
}
@Override
- public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+ public HoodieRecord joinWith(HoodieRecord other,
+ Schema targetSchema) throws IOException {
throw new UnsupportedOperationException();
}
@@ -117,34 +108,44 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
+ public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public boolean isDelete(Schema schema, Properties props) throws IOException {
+ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
return true;
}
@Override
- public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+ public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
return false;
}
@Override
- public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema schema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt, Boolean withOperation, Option<String> partitionNameOp,
- Boolean populateMetaFieldsOp)
- throws IOException {
+ public HoodieRecord<T> copy() {
+ return this;
+ }
+
+ @Override
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema recordSchema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+ Boolean withOperation, Option<String> partitionNameOp, Boolean populateMetaFieldsOp) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+ Properties props, Option<BaseKeyGenerator> keyGen) {
throw new UnsupportedOperationException();
}
@Override
- public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException {
+ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException {
return Option.empty();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 87ed4d10f7..1acefe2204 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -136,7 +136,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
/**
* The cdc operation.
*/
- private HoodieOperation operation;
+ protected HoodieOperation operation;
public HoodieRecord(HoodieKey key, T data) {
this(key, data, null);
@@ -175,7 +175,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
return operation;
}
- public abstract Comparable<?> getOrderingValue(Properties props);
+ public abstract Comparable<?> getOrderingValue(Schema recordSchema, Properties props);
public T getData() {
if (data == null) {
@@ -202,10 +202,6 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
return this;
}
- public void setData(T data) {
- this.data = data;
- }
-
public HoodieRecordLocation getCurrentLocation() {
return currentLocation;
}
@@ -268,9 +264,9 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
public abstract HoodieRecordType getRecordType();
- public abstract String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt);
+ public abstract String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt);
- public abstract String getRecordKey(String keyFieldName);
+ public abstract String getRecordKey(Schema recordSchema, String keyFieldName);
public void seal() {
this.sealed = true;
@@ -288,13 +284,14 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
/**
* Get column in record to support RDDCustomColumnsSortPartitioner
+ * @return
*/
- public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
+ public abstract Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
/**
* Support bootstrap.
*/
- public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
+ public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
/**
* Rewrite record into new schema(add meta columns)
@@ -314,16 +311,19 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
* This method could change in the future.
* @temporary
*/
- public abstract HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException;
+ public abstract HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException;
- public abstract boolean isDelete(Schema schema, Properties props) throws IOException;
+ public abstract boolean isDelete(Schema recordSchema, Properties props) throws IOException;
/**
* Is EmptyRecord. Generated by ExpressionPayload.
*/
- public abstract boolean shouldIgnore(Schema schema, Properties props) throws IOException;
+ public abstract boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException;
- public abstract Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException;
+ /**
+ * This is used to copy data.
+ */
+ public abstract HoodieRecord<T> copy();
public abstract Option<Map<String, String>> getMetadata();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
index c2eb164dca..540244009d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
@@ -31,7 +31,7 @@ public interface HoodieRecordCompatibilityInterface {
* This method used to extract HoodieKey not through keyGenerator.
*/
HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
- Schema schema,
+ Schema recordSchema,
Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
@@ -41,5 +41,12 @@ public interface HoodieRecordCompatibilityInterface {
/**
* This method used to extract HoodieKey through keyGenerator. This method used in ClusteringExecutionStrategy.
*/
- HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen);
+ HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option<BaseKeyGenerator> keyGen);
+
+ /**
+ * This method used to overwrite record key field.
+ */
+ HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException;
+
+ Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 43665b5711..4077752831 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
@@ -37,12 +38,14 @@ import java.util.Properties;
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public interface HoodieRecordMerger extends Serializable {
+ String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+
/**
* This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to yield the same result)
*/
- Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+ Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException;
/**
* The record type handled by the current merger.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
new file mode 100644
index 0000000000..a4ff9a1896
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hudi.common.util.ValidationUtils;
+
+public class MetadataValues {
+ private Map<String, String> kv;
+
+ public MetadataValues(Map<String, String> kv) {
+ ValidationUtils.checkArgument(HOODIE_META_COLUMNS_WITH_OPERATION.containsAll(kv.values()));
+ this.kv = kv;
+ }
+
+ public MetadataValues() {
+ this.kv = new HashMap<>();
+ }
+
+ public void setCommitTime(String value) {
+ this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value);
+ }
+
+ public void setCommitSeqno(String value) {
+ this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value);
+ }
+
+ public void setRecordKey(String value) {
+ this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value);
+ }
+
+ public void setPartitionPath(String value) {
+ this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value);
+ }
+
+ public void setFileName(String value) {
+ this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value);
+ }
+
+ public void setOperation(String value) {
+ this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value);
+ }
+
+ public Map<String, String> getKv() {
+ return kv;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index d21a46e680..013465ffa6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -174,8 +175,8 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
.key("hoodie.compaction.merger.strategy")
- .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
- .withDocumentation("Id of merger strategy. Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
+ .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ .withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");
public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
.key("hoodie.archivelog.folder")
@@ -259,7 +260,7 @@ public class HoodieTableConfig extends HoodieConfig {
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
- public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategy) {
+ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategyId) {
super();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
LOG.info("Loading table properties from " + propertyPath);
@@ -272,8 +273,8 @@ public class HoodieTableConfig extends HoodieConfig {
needStore = true;
}
if (contains(MERGER_STRATEGY) && payloadClassName != null
- && !getString(MERGER_STRATEGY).equals(mergerStrategy)) {
- setValue(MERGER_STRATEGY, mergerStrategy);
+ && !getString(MERGER_STRATEGY).equals(mergerStrategyId)) {
+ setValue(MERGER_STRATEGY, mergerStrategyId);
needStore = true;
}
if (needStore) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index c7d87a06b7..8a4901019c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
@@ -162,6 +163,8 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
+ // Log scanner merge log with precombine
+ this.payloadProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
if (this.preCombineField != null) {
this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 0ad1eea28c..42294c3d11 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -131,6 +131,10 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return records;
}
+ public HoodieRecordType getRecordType() {
+ return recordMerger.getRecordType();
+ }
+
public long getNumMergedRecordsInLog() {
return numMergedRecordsInLog;
}
@@ -143,23 +147,22 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
}
@Override
- protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws IOException {
- String key = hoodieRecord.getRecordKey();
+ protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
+ String key = newRecord.getRecordKey();
if (records.containsKey(key)) {
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
// done when a DELETE (empty payload) is encountered before or after an insert/update.
HoodieRecord<T> oldRecord = records.get(key);
T oldValue = oldRecord.getData();
- T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord, hoodieRecord, readerSchema, this.getPayloadProps()).get()).getData();
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If combinedValue is oldValue, no need rePut oldRecord
- if (combinedValue != oldValue) {
- hoodieRecord.setData(combinedValue);
- records.put(key, hoodieRecord);
+ if (combinedRecord.getData() != oldValue) {
+ records.put(key, combinedRecord.copy());
}
} else {
// Put the record as is
- records.put(key, hoodieRecord);
+ records.put(key, newRecord.copy());
}
}
@@ -172,8 +175,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
// For same ordering values, uses the natural order(arrival time semantics).
- Comparable curOrderingVal = oldRecord.getOrderingValue(
- this.hoodieTableMetaClient.getTableConfig().getProps());
+ Comparable curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps());
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 272a3a8d8a..f93947ea0b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -55,7 +55,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
@Override
protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Exception {
// Just call callback without merging
- callback.apply(hoodieRecord);
+ callback.apply(hoodieRecord.copy());
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index ab669730f6..19ecdbf9ee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -213,7 +213,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
}
protected Option<String> getRecordKey(HoodieRecord record) {
- return Option.ofNullable(record.getRecordKey(keyFieldName));
+ return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 37db0caa9c..faa7856e43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log.block;
+import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -42,7 +43,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
@@ -58,7 +58,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.TreeMap;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -216,7 +215,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Option<Schema.Field> keyField = getKeyField(schema);
// Reset key value w/in the record to avoid duplicating the key w/in payload
if (keyField.isPresent()) {
- record.updateValues(schema, new Properties(), Collections.singletonMap(keyField.get().name(), StringUtils.EMPTY_STRING));
+ record.truncateRecordKey(schema, new Properties(), keyField.get().name());
}
return HoodieAvroUtils.recordToBytes(record, schema).get();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 60aa6c43a9..8f70495323 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -18,10 +18,15 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.table.HoodieTableConfig;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
public class ConfigUtils {
@@ -52,4 +57,11 @@ public class ConfigUtils {
}
return payloadClass;
}
+
+ public static List<String> getMergerImpls(Map<String, String> optParams) {
+ return Arrays.stream(
+ optParams.getOrDefault("hoodie.datasource.write.merger.impls",
+ HoodieAvroRecordMerger.class.getName()).split(","))
+ .map(String::trim).distinct().collect(Collectors.toList());
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 92b14ff340..666a084877 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -66,7 +66,7 @@ public class HoodieRecordUtils {
/**
* Instantiate a given class with a record merge.
*/
- public static HoodieRecordMerger generateRecordMerger(String basePath, EngineType engineType,
+ public static HoodieRecordMerger createRecordMerger(String basePath, EngineType engineType,
List<String> mergerClassList, String mergerStrategy) {
if (mergerClassList.isEmpty() || HoodieTableMetadata.isMetadataTable(basePath)) {
return HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index ca8c3ab428..42ea766eb9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -24,7 +24,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
-import java.util.function.Function;
/**
* This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in
@@ -36,24 +35,17 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
private final ParquetReader<T> parquetReader;
// Holds the next entry returned by the parquet reader
private T next;
- // For directly use InternalRow
- private Function<T, T> mapper;
public ParquetReaderIterator(ParquetReader<T> parquetReader) {
this.parquetReader = parquetReader;
}
- public ParquetReaderIterator(ParquetReader<T> parquetReader, Function<T, T> mapper) {
- this.parquetReader = parquetReader;
- this.mapper = mapper;
- }
-
@Override
public boolean hasNext() {
try {
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null) {
- this.next = read();
+ this.next = parquetReader.read();
}
return this.next != null;
} catch (Exception e) {
@@ -72,7 +64,7 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
}
}
T retVal = this.next;
- this.next = read();
+ this.next = null;
return retVal;
} catch (Exception e) {
FileIOUtils.closeQuietly(parquetReader);
@@ -80,15 +72,6 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
}
}
- private T read() throws IOException {
- T record = parquetReader.read();
- if (mapper == null || record == null) {
- return record;
- } else {
- return mapper.apply(record);
- }
- }
-
public void close() {
try {
parquetReader.close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 6f3b3ef928..9041db5144 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -19,11 +19,8 @@
package org.apache.hudi.common.util;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.ByteArrayOutputStream;
@@ -39,14 +36,6 @@ public class SerializationUtils {
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
ThreadLocal.withInitial(KryoSerializerInstance::new);
- private static Pair<String, Serializer<?>> SERIALIZER_REGISTER = null;
-
- public static void setOverallRegister(String className, Serializer<?> serializer) {
- if (SERIALIZER_REGISTER == null) {
- SERIALIZER_REGISTER = Pair.of(className, serializer);
- }
- }
-
// Serialize
// -----------------------------------------------------------------------
@@ -132,13 +121,6 @@ public class SerializationUtils {
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
- if (SERIALIZER_REGISTER != null) {
- try {
- kryo.register(Class.forName(SERIALIZER_REGISTER.getLeft()), SERIALIZER_REGISTER.getRight());
- } catch (ClassNotFoundException e) {
- throw new HoodieException(e);
- }
- }
return kryo;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 9499954911..a4f2c62437 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -33,8 +33,6 @@ public class StringUtils {
public static final String EMPTY_STRING = "";
- public static final String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
-
/**
* <p>
* Joins the elements of the provided array into a single String containing the provided list of elements.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java
new file mode 100644
index 0000000000..e3bf9215f5
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Space-efficient, comparable, immutable lists, copied from calcite core.
+ */
+public class FlatLists {
+ private FlatLists() {
+ }
+
+ /**
+ * Creates a memory-, CPU- and cache-efficient immutable list from an
+ * existing list. The list is always copied.
+ *
+ * @param t Array of members of list
+ * @param <T> Element type
+ * @return List containing the given members
+ */
+ public static <T> List<T> of(List<T> t) {
+ return of_(t);
+ }
+
+ public static <T extends Comparable> ComparableList<T> ofComparable(List<T> t) {
+ return of_(t);
+ }
+
+ public static <T extends Comparable> ComparableList<T> ofComparableArray(Object[] t) {
+ return ofComparable(Arrays.stream(t).map(v -> (T)v).collect(Collectors.toList()));
+ }
+
+ private static <T> ComparableList<T> of_(List<T> t) {
+ return new ComparableListImpl(new ArrayList<>(t));
+ }
+
+
+ /** List that is also comparable.
+ *
+ * <p>You can create an instance whose type
+ * parameter {@code T} does not extend {@link Comparable}, but you will get a
+ * {@link ClassCastException} at runtime when you call
+ * {@link #compareTo(Object)} if the elements of the list do not implement
+ * {@code Comparable}.
+ */
+ public interface ComparableList<T> extends List<T>, Comparable<List> {
+ }
+
+ /** Wrapper around a list that makes it implement the {@link Comparable}
+ * interface using lexical ordering. The elements must be comparable. */
+ static class ComparableListImpl<T extends Comparable<T>>
+ extends AbstractList<T>
+ implements ComparableList<T>, KryoSerializable {
+ private List<T> list;
+
+ protected ComparableListImpl(List<T> list) {
+ this.list = list;
+ }
+
+ public T get(int index) {
+ return list.get(index);
+ }
+
+ public int size() {
+ return list.size();
+ }
+
+ public int compareTo(List o) {
+ return compare(list, o);
+ }
+
+ static <T extends Comparable<T>> int compare(List<T> list0, List<T> list1) {
+ final int size0 = list0.size();
+ final int size1 = list1.size();
+ if (size1 == size0) {
+ return compare(list0, list1, size0);
+ }
+ final int c = compare(list0, list1, Math.min(size0, size1));
+ if (c != 0) {
+ return c;
+ }
+ return size0 - size1;
+ }
+
+ static <T extends Comparable<T>> int compare(List<T> list0, List<T> list1, int size) {
+ for (int i = 0; i < size; i++) {
+ Comparable o0 = list0.get(i);
+ Comparable o1 = list1.get(i);
+ int c = compare(o0, o1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ static <T extends Comparable<T>> int compare(T a, T b) {
+ if (a == b) {
+ return 0;
+ }
+ if (a == null) {
+ return -1;
+ }
+ if (b == null) {
+ return 1;
+ }
+ return a.compareTo(b);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output) {
+ kryo.writeClassAndObject(output, list);
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input) {
+ list = (List<T>) kryo.readClassAndObject(input);
+ }
+ }
+
+}
+
+
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
index a194e2fc6a..3834574cd0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
@@ -26,17 +26,6 @@ import java.io.IOException;
public class HoodieAvroFileReaderFactory extends HoodieFileReaderFactory {
- private static class SingletonHolder {
- private static final HoodieAvroFileReaderFactory INSTANCE = new HoodieAvroFileReaderFactory();
- }
-
- private HoodieAvroFileReaderFactory() {
- }
-
- public static HoodieFileReaderFactory getFileReaderFactory() {
- return SingletonHolder.INSTANCE;
- }
-
protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
return new HoodieAvroParquetReader(conf, path);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
index bb046c2395..e53a7b095f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -42,13 +41,13 @@ public interface HoodieAvroFileWriter extends HoodieFileWriter {
@Override
default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException {
- IndexedRecord avroPayload = ((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, props).get()).getData();
+ IndexedRecord avroPayload = record.toIndexedRecord(schema, props).get().getData();
writeAvroWithMetadata(key, avroPayload);
}
@Override
default void write(String recordKey, HoodieRecord record, Schema schema, Properties props) throws IOException {
- IndexedRecord avroPayload = ((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, props).get()).getData();
+ IndexedRecord avroPayload = record.toIndexedRecord(schema, props).get().getData();
writeAvro(recordKey, avroPayload);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
index 0ab052a877..86d57fde73 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
@@ -44,17 +44,6 @@ import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
- private static class SingletonHolder {
- private static final HoodieAvroFileWriterFactory INSTANCE = new HoodieAvroFileWriterFactory();
- }
-
- private HoodieAvroFileWriterFactory() {
- }
-
- public static HoodieFileWriterFactory getFileReaderFactory() {
- return HoodieAvroFileWriterFactory.SingletonHolder.INSTANCE;
- }
-
protected HoodieFileWriter newParquetFileWriter(
String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index f84ec530ad..24164a146e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -28,8 +28,6 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -40,13 +38,12 @@ public class HoodieFileReaderFactory {
public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
switch (recordType) {
case AVRO:
- return HoodieAvroFileReaderFactory.getFileReaderFactory();
+ return new HoodieAvroFileReaderFactory();
case SPARK:
try {
Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
- Method method = clazz.getMethod("getFileReaderFactory", null);
- return (HoodieFileReaderFactory) method.invoke(null,null);
- } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+ return (HoodieFileReaderFactory) clazz.newInstance();
+ } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) {
throw new HoodieException("Unable to create hoodie spark file writer factory", e);
}
default:
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 99f35d7a0f..456383d374 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -48,13 +46,12 @@ public class HoodieFileWriterFactory {
private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
switch (recordType) {
case AVRO:
- return HoodieAvroFileWriterFactory.getFileReaderFactory();
+ return new HoodieAvroFileWriterFactory();
case SPARK:
try {
Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
- Method method = clazz.getMethod("getFileWriterFactory", null);
- return (HoodieFileWriterFactory) method.invoke(null, null);
- } catch (NoSuchMethodException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ return (HoodieFileWriterFactory) clazz.newInstance();
+ } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) {
throw new HoodieException("Unable to create hoodie spark file writer factory", e);
}
default:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 42b0c40a83..e6ef39bf8c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -25,9 +25,9 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -129,8 +129,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> RECORD_MERGER_STRATEGY = ConfigOptions
.key("record.merger.strategy")
.stringType()
- .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
- .withDescription("Id of merger strategy. Hudi will pick RecordMergers in record.merger.impls which has the same merger strategy id");
+ .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ .withDescription("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in record.merger.impls which has the same merger strategy id");
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default_name")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index ba63b982f3..ed4f1b02e6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -433,7 +433,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
Properties props = new Properties();
config.addAllToProperties(props);
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
+ records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
+ .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
}
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
@@ -470,7 +471,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
Properties props = new Properties();
config.addAllToProperties(props);
- records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
+ records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
+ .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 517e57be26..519fad1159 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.streamer;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
@@ -124,8 +125,8 @@ public class FlinkStreamerConfig extends Configuration {
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)")
public String mergerImpls = HoodieAvroRecordMerger.class.getName();
- @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick RecordMergers in merger-impls which has the same merger strategy id")
- public String mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+ @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls which has the same merger strategy id")
+ public String mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index b060505bb3..7abf304034 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -178,7 +178,7 @@ public class FormatUtils {
.map(String::trim)
.distinct()
.collect(Collectors.toList());
- HoodieRecordMerger merger = HoodieRecordUtils.generateRecordMerger(
+ HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger(
split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
return HoodieUnMergedLogRecordScanner.newBuilder()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 5364f6d96b..e9accb0a86 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.format.mor;
-import java.util.stream.Collectors;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -30,6 +29,7 @@ import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -65,6 +65,7 @@ import org.apache.flink.types.RowKind;
import java.io.IOException;
import java.util.Arrays;
+import java.util.stream.Collectors;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -764,7 +765,7 @@ public class MergeOnReadInputFormat
.map(String::trim)
.distinct()
.collect(Collectors.toList());
- this.recordMerger = HoodieRecordUtils.generateRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
+ this.recordMerger = HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
}
@Override
@@ -854,7 +855,7 @@ public class MergeOnReadInputFormat
final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord);
- Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, record, tableSchema, payloadProps);
+ Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
return resultRecord.get().toIndexedRecord(tableSchema, new Properties());
}
}
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index 7e571720c1..a9cf806b19 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -19,11 +19,11 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hadoop.conf.Configuration;
@@ -160,7 +160,7 @@ public class TestInputPathHandler {
properties.setProperty(HoodieTableConfig.NAME.key(), tableName);
properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), HoodieAvroPayload.class.getName());
- properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(), StringUtils.DEFAULT_MERGER_STRATEGY_UUID);
+ properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 051a109a26..04daafbab0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -21,8 +21,8 @@ package org.apache.hudi;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.avro.Schema;
@@ -34,11 +34,11 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
@Override
public String getMergingStrategy() {
- return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+ return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
@Override
- public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK);
@@ -48,12 +48,12 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
}
if (older.getData() == null) {
// use natural order for delete record
- return Option.of(newer);
+ return Option.of(Pair.of(newer, newSchema));
}
- if (older.getOrderingValue(props).compareTo(newer.getOrderingValue(props)) > 0) {
- return Option.of(older);
+ if (older.getOrderingValue(oldSchema, props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ return Option.of(Pair.of(older, oldSchema));
} else {
- return Option.of(newer);
+ return Option.of(Pair.of(newer, newSchema));
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ece56a5159..53380c9aa6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -79,7 +79,7 @@ case class HoodieTableState(tablePath: String,
usesVirtualKeys: Boolean,
recordPayloadClassName: String,
metadataConfig: HoodieMetadataConfig,
- mergerImpls: String,
+ mergerImpls: List[String],
mergerStrategy: String)
/**
@@ -461,6 +461,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
protected def getTableState: HoodieTableState = {
+ val mergerImpls = ConfigUtils.getMergerImpls(optParams.asJava).asScala.toList
+ val mergerStrategy = optParams.getOrElse(HoodieWriteConfig.MERGER_STRATEGY.key(),
+ sqlContext.getConf(HoodieWriteConfig.MERGER_STRATEGY.key(), HoodieWriteConfig.MERGER_STRATEGY.defaultValue()))
+
// Subset of the state of table's configuration as of at the time of the query
HoodieTableState(
tablePath = basePath,
@@ -470,10 +474,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass,
metadataConfig = fileIndex.metadataConfig,
- mergerImpls = optParams.getOrElse(HoodieWriteConfig.MERGER_IMPLS.key(),
- HoodieWriteConfig.MERGER_IMPLS.defaultValue()),
- mergerStrategy = optParams.getOrElse(HoodieWriteConfig.MERGER_STRATEGY.key(),
- metaClient.getTableConfig.getMergerStrategy)
+ mergerImpls = mergerImpls,
+ mergerStrategy = mergerStrategy
)
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f18ef4fc13..1b8cb57936 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -295,8 +295,7 @@ object HoodieSparkSqlWriter {
tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
val writeConfig = client.getConfig
- if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
- writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+ if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
}
// Create a HoodieWriteClient & issue the write.
@@ -837,7 +836,7 @@ object HoodieSparkSqlWriter {
}
}
- private def createHoodieRecordRdd(df: DataFrame, config: HoodieWriteConfig, parameters: Map[String, String], schema: Schema): JavaRDD[HoodieRecord[_]] = {
+ private def createHoodieRecordRdd(df: DataFrame, config: HoodieWriteConfig, parameters: Map[String, String], schema: Schema) = {
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val tblName = config.getString(HoodieWriteConfig.TBL_NAME)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
@@ -849,7 +848,9 @@ object HoodieSparkSqlWriter {
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps))
val partitionCols = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
val dropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
- config.getRecordMerger.getRecordType match {
+ val recordType = config.getRecordMerger.getRecordType
+ log.debug(s"Use $recordType")
+ recordType match {
case HoodieRecord.HoodieRecordType.AVRO =>
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
@@ -873,15 +874,12 @@ object HoodieSparkSqlWriter {
// ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
- val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
- HoodieInternalRowUtils.addCompressedSchema(structType)
df.queryExecution.toRdd.map(row => {
val internalRow = row.copy()
- val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structTypeBC.value)
- val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structTypeBC.value)
- val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structTypeBC.value)
+ val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structType)
+ val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
+ val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
val key = new HoodieKey(recordKey.toString, partitionPath.toString)
- HoodieInternalRowUtils.addCompressedSchema(structTypeBC.value)
new HoodieSparkRecord(key, processedRow, writeSchema)
}).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 87758ebd3c..f5a3834304 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection}
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.commmon.model.HoodieSparkRecord
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
@@ -32,22 +32,20 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.util.{HoodieRecordUtils, SerializationUtils}
+import org.apache.hudi.common.util.HoodieRecordUtils
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.avro.{Schema, SchemaNormalization}
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.hudi.SparkStructTypeSerializer
import org.apache.spark.sql.HoodieCatalystExpressionUtils
import org.apache.spark.sql.types.StructType
import java.io.Closeable
-import java.nio.charset.StandardCharsets
import java.util.Properties
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import scala.annotation.tailrec
@@ -201,9 +199,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
private val baseFileIterator = baseFileReader(split.dataFile.get)
- private val mergerList = tableState.mergerImpls.split(",")
- .map(_.trim).distinct.toList.asJava
- private val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+ private val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.mergerImpls.asJava, tableState.mergerStrategy)
override def hasNext: Boolean = hasNextInternal
@@ -243,15 +239,18 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
recordMerger.getRecordType match {
case HoodieRecordType.SPARK =>
val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
- toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
+ val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
+ toScalaOption(result)
.map(r => {
- val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(r.asInstanceOf[HoodieSparkRecord].getStructType, structTypeSchema)
- projection.apply(r.getData.asInstanceOf[InternalRow])
+ val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
+ val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
+ projection.apply(r.getLeft.getData.asInstanceOf[InternalRow])
})
case _ =>
val curRecord = new HoodieAvroIndexedRecord(serialize(curRow))
- toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
- .map(r => deserialize(projectAvroUnsafe(r.toIndexedRecord(logFileReaderAvroSchema, new Properties()).get().getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder)))
+ val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
+ toScalaOption(result)
+ .map(r => deserialize(projectAvroUnsafe(r.getLeft.toIndexedRecord(r.getRight, payloadProps).get.getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder)))
}
}
}
@@ -321,15 +320,9 @@ object LogFileIterator {
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
}
- val mergerList = tableState.mergerImpls.split(",")
- .map(_.trim).distinct.toList.asJava
- val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+ val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.mergerImpls.asJava, tableState.mergerStrategy)
logRecordScannerBuilder.withRecordMerger(recordMerger)
- if (recordMerger.getRecordType == HoodieRecordType.SPARK) {
- registerStructTypeSerializerIfNeed(List(HoodieInternalRowUtils.getCachedSchema(logSchema)))
- }
-
logRecordScannerBuilder.build()
}
}
@@ -348,11 +341,4 @@ object LogFileIterator {
.getOrElse(split.logFiles.head.getPath)
.getParent
}
-
- private def registerStructTypeSerializerIfNeed(schemas: List[StructType]): Unit = {
- val schemaMap = schemas.map(schema => (SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8)), schema))
- .toMap
- val serializer = new SparkStructTypeSerializer(schemaMap)
- SerializationUtils.setOverallRegister(classOf[HoodieSparkRecord].getName, serializer)
- }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index ad60f6187b..7c996b10ce 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -24,13 +24,12 @@ import org.apache.hudi.HoodieConversionUtils._
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -162,8 +161,8 @@ class HoodieCDCRDD(
metaClient.getTableConfig.getPayloadClass,
metadataConfig,
// TODO support CDC with spark record
- mergerImpls = classOf[HoodieAvroRecordMerger].getName,
- mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID
+ mergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+ mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
)
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 8cd77900c0..5fdaf81dd2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecordMerger}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.spark.sql.SparkSession
@@ -70,17 +70,11 @@ object HoodieOptionConfig {
.defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
.build()
- val SQL_MERGER_IMPLS: HoodieSQLOption[String] = buildConf()
- .withSqlKey("mergerImpls")
- .withHoodieKey(DataSourceWriteOptions.MERGER_IMPLS.key)
- .defaultValue(classOf[HoodieAvroRecordMerger].getName)
- .build()
-
val SQL_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
.withSqlKey("mergerStrategy")
.withHoodieKey(DataSourceWriteOptions.MERGER_STRATEGY.key)
.withTableConfigKey(HoodieTableConfig.MERGER_STRATEGY.key)
- .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+ .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
.build()
/**
@@ -199,7 +193,7 @@ object HoodieOptionConfig {
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val sqlOptions = mappingTableConfigToSqlOption(options)
- val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_IMPLS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+ val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
sqlOptions.filterKeys(targetOptions.contains)
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 332455ea21..7ef0cbf51e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -66,6 +66,7 @@ trait ProvidesHoodieConfig extends Logging {
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineField,
+ MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, HoodieWriteConfig.MERGER_IMPLS.defaultValue),
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
@@ -185,6 +186,7 @@ trait ProvidesHoodieConfig extends Logging {
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
+ MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, HoodieWriteConfig.MERGER_IMPLS.defaultValue),
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index cad5499b35..371a2d7b41 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -41,8 +41,6 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
@@ -96,13 +94,12 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
} else if (recordType == HoodieRecordType.SPARK) {
SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator;
StructType structType = inputDataset.schema();
- Broadcast<StructType> structTypeBC = new JavaSparkContext(sparkSession.sparkContext()).broadcast(structType);
return inputDataset.queryExecution().toRdd().toJavaRDD().map(row -> {
InternalRow internalRow = row.copy();
- String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structTypeBC.value()).toString();
- String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structTypeBC.value()).toString();
+ String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString();
+ String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
HoodieKey key = new HoodieKey(recordKey, partitionPath);
- return new HoodieSparkRecord(key, internalRow, structTypeBC.value());
+ return new HoodieSparkRecord(key, internalRow, structType);
});
} else {
throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
index 624033b67f..3fabdad4a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
@@ -64,18 +64,6 @@ class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAf
sparkSession.close()
}
- test("test merge") {
- val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
- val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181)))
- val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first()
- val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first()
- val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge)
- assert(rowMerge.get(0, StringType).toString.equals("like"))
- assert(rowMerge.get(1, IntegerType) == 18)
- assert(rowMerge.get(2, StringType).toString.equals("like1"))
- assert(rowMerge.get(3, IntegerType) == 181)
- }
-
test("test rewrite") {
val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 37d320e192..03dcd2db4d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -36,7 +36,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, HoodieSparkRecordMerger}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieInternalRowUtils, HoodieSparkRecordMerger, QuickstartUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index ed4eb373b5..be865476d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -23,8 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecordPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -36,8 +35,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger, SparkDatasetMixin}
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieInternalRowUtils, HoodieSparkRecordMerger, SparkDatasetMixin}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@@ -98,9 +96,12 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
)
@ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
- def testCount(recordType: HoodieRecordType) {
- val (writeOpts, readOpts) = getOpts(recordType)
+ @CsvSource(Array("AVRO, AVRO, avro", "AVRO, SPARK, parquet", "SPARK, AVRO, parquet", "SPARK, SPARK, parquet"))
+ def testCount(readType: HoodieRecordType, writeType: HoodieRecordType, logType: String) {
+ var (_, readOpts) = getOpts(readType)
+ var (writeOpts, _) = getOpts(writeType)
+ readOpts = readOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
+ writeOpts = writeOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
// First Operation:
// Producing parquet files to three default partitions.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
index ebdb943ed8..3547e42148 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
@@ -19,9 +19,10 @@
package org.apache.spark.sql.execution.benchmark
import org.apache.hadoop.fs.Path
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
import org.apache.spark.SparkConf
@@ -60,41 +61,53 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
sparkConf
}
- private def createComplexDataFrame(rowNum: Long, colNum: Int): DataFrame = {
+ private def createComplexDataFrame(rowNum: Long): DataFrame = {
var df = spark.range(0, rowNum).toDF("id")
.withColumn("t1", lit(1))
.withColumn("d1", lit(12.99d))
.withColumn("s1", lit("s1"))
.withColumn("s2", lit("s2"))
.withColumn("s3", lit("s3"))
- for (i <- 0 to colNum) {
+ for (i <- 0 to 1) {
df = df.withColumn(s"struct$i", struct(col("s1").as("st1"), col("s2").as("st2"), col("s3").as("st3")))
.withColumn(s"map$i", map(col("s1"), col("s2")))
- .withColumn(s"array$i", split(col("s1"), " "))
+ .withColumn(s"array$i", array(col("s1")))
}
df
}
- private def prepareHoodieTable(tableName: String, path: String, tableType: String, mergerType: String, df: DataFrame): Unit = {
+ private def prepareHoodieTable(tableName: String, path: String, tableType: String, mergerImpl: String, df: DataFrame): Unit = {
df.collect()
df.createOrReplaceTempView("input_df")
if (spark.catalog.tableExists(tableName)) {
spark.sql(s"drop table if exists $tableName")
}
+ spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
spark.sql(
s"""
- |create table $tableName using hudi
+ |create table $tableName(
+ |id long,
+ |t1 int,
+ |d1 double,
+ |s1 string,
+ |s2 string,
+ |s3 string,
+ |struct0 struct<st1:string, st2:string, st3:string>,
+ |map0 map<string, string>,
+ |array0 array<string>,
+ |struct1 struct<st1:string, st2:string, st3:string>,
+ |map1 map<string, string>,
+ |array1 array<string>
+ |) using hudi
|tblproperties(
| primaryKey = 'id',
| preCombineField = 's1',
| type = '$tableType',
- | ${HoodieWriteConfig.MERGER_IMPLS.key} = '$mergerType',
| ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 'parquet',
| ${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()} = '10')
|location '$path'
- |As
- |select * from input_df
""".stripMargin)
+ spark.sql(s"insert overwrite table $tableName select * from input_df")
}
/**
@@ -106,7 +119,7 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
* org.apache.hudi.HoodieSparkRecordMerger 12654 13924 1100 0.1 12653.8 1.3X
*/
private def overwriteBenchmark(): Unit = {
- val df = createComplexDataFrame(1000000, 1)
+ val df = createComplexDataFrame(1000000)
val benchmark = new HoodieBenchmark("pref insert overwrite", 1000000, 3)
Seq(classOf[HoodieAvroRecordMerger].getName, classOf[HoodieSparkRecordMerger].getName).zip(Seq(avroTable, sparkTable)).foreach {
case (merger, tableName) => benchmark.addCase(merger) { _ =>
@@ -118,26 +131,42 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
benchmark.run()
}
+ /**
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_211-b12 on Mac OS X 10.16
+ * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+ * pref upsert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ * -----------------------------------------------------------------------------------------------------------------------------------
+ * org.apache.hudi.common.model.HoodieAvroRecordMerger 6108 6383 257 0.0 610785.6 1.0X
+ * org.apache.hudi.HoodieSparkRecordMerger 4833 5468 614 0.0 483300.0 1.3X
+ *
+ * Java HotSpot(TM) 64-Bit Server VM 1.8.0_211-b12 on Mac OS X 10.16
+ * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+ * pref read: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ * -----------------------------------------------------------------------------------------------------------------------------------
+ * org.apache.hudi.common.model.HoodieAvroRecordMerger 813 818 8 0.0 81302.1 1.0X
+ * org.apache.hudi.HoodieSparkRecordMerger 604 616 18 0.0 60430.1 1.3X
+ */
private def upsertThenReadBenchmark(): Unit = {
- val avroMerger = classOf[HoodieAvroRecordMerger].getName
- val sparkMerger = classOf[HoodieSparkRecordMerger].getName
- val df = createComplexDataFrame(1000000, 1)
+ val avroMergerImpl = classOf[HoodieAvroRecordMerger].getName
+ val sparkMergerImpl = classOf[HoodieSparkRecordMerger].getName
+ val df = createComplexDataFrame(10000)
withTempDir { avroPath =>
withTempDir { sparkPath =>
- val upsertBenchmark = new HoodieBenchmark("pref upsert", 1000000, 3)
- prepareHoodieTable(avroTable, new Path(avroPath.getCanonicalPath, avroTable).toUri.toString, "mor", avroMerger, df)
- prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath, sparkTable).toUri.toString, "mor", sparkMerger, df)
- df.createOrReplaceTempView("input_df")
- Seq(avroMerger, sparkMerger).zip(Seq(avroTable, sparkTable)).foreach {
- case (merger, tableName) => upsertBenchmark.addCase(merger) { _ =>
- spark.sql(s"update $tableName set s1 = 's1_new' where id > 0")
+ val upsertBenchmark = new HoodieBenchmark("pref upsert", 10000, 3)
+ prepareHoodieTable(avroTable, new Path(avroPath.getCanonicalPath, avroTable).toUri.toString, "mor", avroMergerImpl, df)
+ prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath, sparkTable).toUri.toString, "mor", sparkMergerImpl, df)
+ Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, sparkTable)).foreach {
+ case (mergerImpl, tableName) => upsertBenchmark.addCase(mergerImpl) { _ =>
+ spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
+ spark.sql(s"update $tableName set s1 = 's1_new_1' where id > 0")
}
}
upsertBenchmark.run()
- val readBenchmark = new HoodieBenchmark("pref read", 1000000, 3)
- Seq(avroMerger, sparkMerger).zip(Seq(avroTable, sparkTable)).foreach {
- case (merger, tableName) => readBenchmark.addCase(merger) { _ =>
+ val readBenchmark = new HoodieBenchmark("pref read", 10000, 3)
+ Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, sparkTable)).foreach {
+ case (mergerImpl, tableName) => readBenchmark.addCase(mergerImpl) { _ =>
+ spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
spark.sql(s"select * from $tableName").collect()
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 2cfa8be36f..ef84eb2c89 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieAvroRecordMerger, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieAvroRecordMerger, HoodieRecordMerger, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
@@ -36,14 +36,13 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
assertTrue(with1("primaryKey") == "id")
assertTrue(with1("type") == "cow")
assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName)
- assertTrue(with1("mergerStrategy") == StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+ assertTrue(with1("mergerStrategy") == HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
val ops2 = Map("primaryKey" -> "id",
"preCombineField" -> "timestamp",
"type" -> "mor",
"payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
- "mergerImpls" -> classOf[HoodieAvroRecordMerger].getName,
- "mergerStrategy" -> StringUtils.DEFAULT_MERGER_STRATEGY_UUID
+ "mergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
)
val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
assertTrue(ops2 == with2)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 39be0a8407..deeddd96df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -280,8 +281,8 @@ public class HoodieDeltaStreamer implements Serializable {
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)")
public String mergerImpls = HoodieAvroRecordMerger.class.getName();
- @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick RecordMergers in merger-impls which has the same merger strategy id")
- public String mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+ @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls which has the same merger strategy id")
+ public String mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
@Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema"
+ ".SchemaProvider to attach schemas to input & target table data, built in options: "
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index 192bdbf8c6..c5f611ee53 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -217,13 +217,13 @@ class HoodieRecord {
/**
* Get column in record to support RDDCustomColumnsSortPartitioner
*/
- Object getRecordColumnValues(Schema recordSchema, String[] columns,
+ ComparableList getComparableColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled);
/**
* Support bootstrap.
*/
- HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
+ HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
/**
* Rewrite record into new schema(add meta columns)