You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/10 12:23:36 UTC
[hudi] branch master updated: [HUDI-1771] Propagate CDC format for
hoodie (#3285)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21db6d7 [HUDI-1771] Propagate CDC format for hoodie (#3285)
21db6d7 is described below
commit 21db6d7a84d4a83ec98c110e92ff9c92d05dd530
Author: swuferhong <33...@qq.com>
AuthorDate: Tue Aug 10 20:23:23 2021 +0800
[HUDI-1771] Propagate CDC format for hoodie (#3285)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 16 +++
.../org/apache/hudi/io/HoodieAppendHandle.java | 15 ++-
.../org/apache/hudi/io/HoodieCreateHandle.java | 4 +
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 +
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 4 +-
.../apache/hudi/client/model/HoodieRowData.java | 54 +++-----
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 3 +
.../io/storage/row/HoodieRowDataCreateHandle.java | 4 +-
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 2 +-
.../hudi/table/action/commit/FlinkWriteHelper.java | 19 +--
.../HoodieFlinkMergeOnReadTableCompactor.java | 4 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 48 +++++++-
.../apache/hudi/common/model/HoodieOperation.java | 125 +++++++++++++++++++
.../org/apache/hudi/common/model/HoodieRecord.java | 23 ++++
.../hudi/common/table/TableSchemaResolver.java | 12 +-
.../table/log/AbstractHoodieLogRecordScanner.java | 20 ++-
.../common/table/log/HoodieFileSliceReader.java | 4 +-
.../table/log/HoodieMergedLogRecordScanner.java | 26 ++--
.../table/log/HoodieUnMergedLogRecordScanner.java | 13 +-
.../apache/hudi/common/util/SpillableMapUtils.java | 16 ++-
.../hudi/metadata/HoodieBackedTableMetadata.java | 10 +-
.../HoodieMetadataMergedLogRecordScanner.java | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 11 ++
.../org/apache/hudi/sink/StreamWriteFunction.java | 12 +-
.../hudi/sink/bootstrap/BootstrapFunction.java | 2 +-
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 11 +-
.../sink/compact/CompactionPlanSourceFunction.java | 20 +--
.../hudi/sink/compact/HoodieFlinkCompactor.java | 3 +-
.../sink/transform/RowDataToHoodieFunction.java | 10 +-
.../apache/hudi/sink/utils/HiveSyncContext.java | 1 +
.../apache/hudi/sink/utils/PayloadCreation.java | 5 +-
.../org/apache/hudi/table/HoodieTableSink.java | 13 +-
.../org/apache/hudi/table/HoodieTableSource.java | 10 +-
.../org/apache/hudi/table/format/FormatUtils.java | 131 +++++++++++++++++++-
.../table/format/mor/MergeOnReadInputFormat.java | 105 ++++++++++++++--
.../table/format/mor/MergeOnReadTableState.java | 8 ++
.../java/org/apache/hudi/util/ChangelogModes.java | 46 +++++++
.../java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../org/apache/hudi/sink/StreamWriteITCase.java | 12 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 47 ++++---
.../apache/hudi/sink/bulk/TestRowDataKeyGen.java | 4 +-
.../apache/hudi/table/HoodieDataSourceITCase.java | 29 +++--
.../apache/hudi/table/format/TestInputFormat.java | 122 ++++++++++++++++--
.../test/java/org/apache/hudi/utils/TestData.java | 85 ++++++++++++-
.../org/apache/hudi/utils/TestHoodieRowData.java | 136 +++++++++++++++++++++
.../utils/factory/CollectSinkTableFactory.java | 1 +
.../java/org/apache/hudi/dla/HoodieDLAClient.java | 2 +-
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 5 +
.../org/apache/hudi/hive/HoodieHiveClient.java | 2 +-
.../hudi/sync/common/AbstractSyncHoodieClient.java | 18 ++-
50 files changed, 1081 insertions(+), 199 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7e9c0cc..9616b55 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -373,6 +373,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. "
+ "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data");
+ public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty
+ .key("hoodie.allow.operation.metadata.field")
+ .defaultValue(false)
+ .sinceVersion("0.9")
+ .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
+ + "Once enabled, all the changes of a record are persisted to the delta log directly without merge");
+
private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -1309,6 +1316,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
}
+ public boolean allowOperationMetadataField() {
+ return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -1615,6 +1626,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withAllowOperationMetadataField(boolean allowOperationMetadataField) {
+ writeConfig.setValue(ALLOW_OPERATION_METADATA_FIELD, Boolean.toString(allowOperationMetadataField));
+ return this;
+ }
+
public Builder withProperties(Properties properties) {
this.writeConfig.getProps().putAll(properties);
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index fe45d4b..1315c99 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -197,20 +198,26 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is a update or insert record.
boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
+ // If the format can not record the operation field, nullify the DELETE payload manually.
+ boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
- Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
+ Option<IndexedRecord> avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
return avroRecord;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
- avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
+ GenericRecord rewriteRecord = rewriteRecord((GenericRecord) avroRecord.get());
+ avroRecord = Option.of(rewriteRecord);
String seqId =
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
if (config.populateMetaFields()) {
- HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
+ HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
- HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId);
+ HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
+ }
+ if (config.allowOperationMetadataField()) {
+ HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation());
}
if (isUpdateRecord(hoodieRecord)) {
updatedRecordsWritten++;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index eef5b3d..01ad453 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -22,6 +22,7 @@ import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -127,6 +128,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
+ if (HoodieOperation.isDelete(record.getOperation())) {
+ avroRecord = Option.empty();
+ }
try {
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 347f8cc..3e20141 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -264,6 +265,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
return false;
}
+ if (HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+ indexedRecord = Option.empty();
+ }
try {
if (indexedRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 0337b0e..306021b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -112,9 +112,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
this.partitionPath = partitionPath;
this.fileId = fileId;
this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));
- this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema);
+ this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema, config.allowOperationMetadataField());
this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config));
- this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema);
+ this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
index 86acc1c..dfc425c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.model;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
@@ -35,12 +35,7 @@ import org.apache.flink.types.RowKind;
* copy rather than fetching from {@link RowData}.
*/
public class HoodieRowData implements RowData {
-
- private final String commitTime;
- private final String commitSeqNumber;
- private final String recordKey;
- private final String partitionPath;
- private final String fileName;
+ private final String[] metaColumns;
private final RowData row;
private final int metaColumnsNum;
@@ -49,14 +44,19 @@ public class HoodieRowData implements RowData {
String recordKey,
String partitionPath,
String fileName,
- RowData row) {
- this.commitTime = commitTime;
- this.commitSeqNumber = commitSeqNumber;
- this.recordKey = recordKey;
- this.partitionPath = partitionPath;
- this.fileName = fileName;
+ RowData row,
+ boolean withOperation) {
+ this.metaColumnsNum = withOperation ? 6 : 5;
+ this.metaColumns = new String[metaColumnsNum];
+ metaColumns[0] = commitTime;
+ metaColumns[1] = commitSeqNumber;
+ metaColumns[2] = recordKey;
+ metaColumns[3] = partitionPath;
+ metaColumns[4] = fileName;
+ if (withOperation) {
+ metaColumns[5] = HoodieOperation.fromValue(row.getRowKind().toByteValue()).getName();
+ }
this.row = row;
- this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
}
@Override
@@ -74,28 +74,6 @@ public class HoodieRowData implements RowData {
this.row.setRowKind(kind);
}
- private String getMetaColumnVal(int ordinal) {
- switch (ordinal) {
- case 0: {
- return commitTime;
- }
- case 1: {
- return commitSeqNumber;
- }
- case 2: {
- return recordKey;
- }
- case 3: {
- return partitionPath;
- }
- case 4: {
- return fileName;
- }
- default:
- throw new IllegalArgumentException("Not expected");
- }
- }
-
@Override
public boolean isNullAt(int ordinal) {
if (ordinal < metaColumnsNum) {
@@ -181,4 +159,8 @@ public class HoodieRowData implements RowData {
public MapData getMap(int ordinal) {
return row.getMap(ordinal - metaColumnsNum);
}
+
+ private String getMetaColumnVal(int ordinal) {
+ return this.metaColumns[ordinal];
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 987f335..81bd598 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -79,6 +79,9 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
+ // do not use the HoodieRecord operation because hoodie writer has its own
+ // INSERT/MERGE bucket for 'UPSERT' semantics. For e.g, a hoodie record with fresh new key
+ // and operation HoodieCdcOperation.DELETE would be put into either an INSERT bucket or UPDATE bucket.
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 76fad8b..238367b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -117,7 +117,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
try {
String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(),
- record);
+ record, writeConfig.allowOperationMetadataField());
try {
fileWriter.writeRow(recordKey, rowData);
writeStatus.markSuccess(recordKey);
@@ -131,7 +131,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
}
/**
- * @returns {@code true} if this handle can take in more writes. else {@code false}.
+ * Returns {@code true} if this handle can take in more writes. else {@code false}.
*/
public boolean canWrite() {
return fileWriter.canWrite();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9776870..8a9b4bf 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -354,7 +354,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
dataFileToBeMerged, taskContextSupplier, Option.empty());
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged,taskContextSupplier, Option.empty());
+ dataFileToBeMerged, taskContextSupplier, Option.empty());
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 5238123..5cb1b80 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
* <p>Computing the records batch locations all at a time is a pressure to the engine,
* we should avoid that in streaming system.
*/
-public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
+public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> {
private FlinkWriteHelper() {
@@ -80,8 +81,8 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
@Override
public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records,
- HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
- int parallelism) {
+ HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
+ int parallelism) {
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
// If index used is global, then records are expected to differ in their partitionPath
final Object key = record.getKey().getRecordKey();
@@ -89,13 +90,17 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
}).collect(Collectors.groupingBy(Pair::getLeft));
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
- @SuppressWarnings("unchecked")
- T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+ final T data1 = rec1.getData();
+ final T data2 = rec2.getData();
+
+ @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
- HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
- HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData);
+ boolean choosePrev = data1.equals(reducedData);
+ HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
+ HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
+ HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData, operation);
// reuse the location from the first record.
hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
return hoodieRecord;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index f08c8b5..1f4a524 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -86,7 +86,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
@Override
public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
- HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
+ HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+ "the function works as a separate pipeline");
}
@@ -98,7 +98,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 695f97e..239eed5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -19,7 +19,10 @@
package org.apache.hudi.avro;
import org.apache.avro.specific.SpecificRecordBase;
+
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -151,7 +154,8 @@ public class HoodieAvroUtils {
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
+ || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
+ || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -164,8 +168,20 @@ public class HoodieAvroUtils {
/**
* Adds the Hoodie metadata fields to the given schema.
+ *
+ * @param schema The schema
*/
public static Schema addMetadataFields(Schema schema) {
+ return addMetadataFields(schema, false);
+ }
+
+ /**
+ * Adds the Hoodie metadata fields to the given schema.
+ *
+ * @param schema The schema
+ * @param withOperationField Whether to include the '_hoodie_operation' field
+ */
+ public static Schema addMetadataFields(Schema schema, boolean withOperationField) {
List<Schema.Field> parentFields = new ArrayList<>();
Schema.Field commitTimeField =
@@ -184,6 +200,13 @@ public class HoodieAvroUtils {
parentFields.add(recordKeyField);
parentFields.add(partitionPathField);
parentFields.add(fileNameField);
+
+ if (withOperationField) {
+ final Schema.Field operationField =
+ new Schema.Field(HoodieRecord.OPERATION_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
+ parentFields.add(operationField);
+ }
+
for (Schema.Field field : schema.getFields()) {
if (!isMetadataField(field.name())) {
Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
@@ -202,7 +225,7 @@ public class HoodieAvroUtils {
public static Schema removeMetadataFields(Schema schema) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
- .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+ .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.collect(Collectors.toList());
Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
@@ -268,6 +291,11 @@ public class HoodieAvroUtils {
return record;
}
+ public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) {
+ record.put(HoodieRecord.OPERATION_METADATA_FIELD, operation.getName());
+ return record;
+ }
+
/**
* Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query
* engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
@@ -454,6 +482,22 @@ public class HoodieAvroUtils {
}
/**
+ * Returns the string value of the given record {@code rec} and field {@code fieldName}.
+ * The field and value both could be missing.
+ *
+ * @param rec The record
+ * @param fieldName The field name
+ *
+ * @return the string form of the field
+ * or empty if the schema does not contain the field name or the value is null
+ */
+ public static Option<String> getNullableValAsString(GenericRecord rec, String fieldName) {
+ Schema.Field field = rec.getSchema().getField(fieldName);
+ String fieldVal = field == null ? null : StringUtils.objToString(rec.get(field.pos()));
+ return Option.ofNullable(fieldVal);
+ }
+
+ /**
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
*
* @param fieldSchema avro field schema
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java
new file mode 100644
index 0000000..0da40eb
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Represents the changes that a row can describe in a changelog.
+ */
+public enum HoodieOperation {
+ /**
+ * Insert operation.
+ */
+ INSERT("I", (byte) 0),
+ /**
+ * Update operation with previous record content,
+ * should be used together with {@link #UPDATE_AFTER} for modeling an update operation.
+ */
+ UPDATE_BEFORE("-U", (byte) 1),
+ /**
+ * Update operation with new record content.
+ */
+ UPDATE_AFTER("U", (byte) 2),
+ /**
+ * Delete operation.
+ */
+ DELETE("D", (byte) 4);
+
+ private final String name;
+ private final byte value;
+
+ HoodieOperation(String name, byte value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public byte getValue() {
+ return value;
+ }
+
+ public static HoodieOperation fromValue(byte value) {
+ switch (value) {
+ case 0:
+ return INSERT;
+ case 1:
+ return UPDATE_BEFORE;
+ case 2:
+ return UPDATE_AFTER;
+ case 3:
+ return DELETE;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public static HoodieOperation fromName(Option<String> nameOpt) {
+ if (!nameOpt.isPresent()) {
+ return null;
+ }
+ return fromName(nameOpt.get());
+ }
+
+ public static HoodieOperation fromName(String name) {
+ switch (name) {
+ case "I":
+ return INSERT;
+ case "-U":
+ return UPDATE_BEFORE;
+ case "U":
+ return UPDATE_AFTER;
+ case "D":
+ return DELETE;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ /**
+ * Returns whether the operation is INSERT.
+ */
+ public static boolean isInsert(HoodieOperation operation) {
+ return operation == INSERT;
+ }
+
+ /**
+ * Returns whether the operation is UPDATE_BEFORE.
+ */
+ public static boolean isUpdateBefore(HoodieOperation operation) {
+ return operation == UPDATE_BEFORE;
+ }
+
+ /**
+ * Returns whether the operation is UPDATE_AFTER.
+ */
+ public static boolean isUpdateAfter(HoodieOperation operation) {
+ return operation == UPDATE_AFTER;
+ }
+
+ /**
+ * Returns whether the operation is DELETE.
+ */
+ public static boolean isDelete(HoodieOperation operation) {
+ return operation == DELETE;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 6484c5c..1742778 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -39,11 +39,19 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key";
public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path";
public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
+ public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
public static final List<String> HOODIE_META_COLUMNS =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD);
+ // Temporary to support the '_hoodie_operation' field, once we solve
+ // the compatibility problem, it can be removed.
+ public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
+ CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
+ RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
+ OPERATION_METADATA_FIELD);
+
public static final Map<String, Integer> HOODIE_META_COLUMNS_NAME_TO_POS =
IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx -> Pair.of(HOODIE_META_COLUMNS.get(idx), idx))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -73,12 +81,22 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
*/
private boolean sealed;
+ /**
+ * The cdc operation.
+ */
+ private HoodieOperation operation;
+
public HoodieRecord(HoodieKey key, T data) {
+ this(key, data, null);
+ }
+
+ public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
this.key = key;
this.data = data;
this.currentLocation = null;
this.newLocation = null;
this.sealed = false;
+ this.operation = operation;
}
public HoodieRecord(HoodieRecord<T> record) {
@@ -86,6 +104,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
this.currentLocation = record.currentLocation;
this.newLocation = record.newLocation;
this.sealed = record.sealed;
+ this.operation = record.operation;
}
public HoodieRecord() {
@@ -95,6 +114,10 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
return key;
}
+ public HoodieOperation getOperation() {
+ return operation;
+ }
+
public T getData() {
if (data == null) {
throw new IllegalStateException("Payload already deflated for record.");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 20e5a82..8d16e91 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -55,10 +55,16 @@ import org.apache.parquet.schema.MessageType;
public class TableSchemaResolver {
private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
- private HoodieTableMetaClient metaClient;
+ private final HoodieTableMetaClient metaClient;
+ private final boolean withOperationField;
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
+ this(metaClient, false);
+ }
+
+ public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) {
this.metaClient = metaClient;
+ this.withOperationField = withOperationField;
}
/**
@@ -170,7 +176,7 @@ public class TableSchemaResolver {
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
if (schemaFromTableConfig.isPresent()) {
if (includeMetadataFields) {
- return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get());
+ return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
} else {
return schemaFromTableConfig.get();
}
@@ -256,7 +262,7 @@ public class TableSchemaResolver {
Schema schema = new Schema.Parser().parse(existingSchemaStr);
if (includeMetadataFields) {
- schema = HoodieAvroUtils.addMetadataFields(schema);
+ schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField);
}
return Option.of(schema);
} catch (Exception e) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 95ed80a..79d5821 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordScanner {
private final int bufferSize;
// optional instant range for incremental block filtering
private final Option<InstantRange> instantRange;
+ // Read the operation metadata field from the avro record
+ private final boolean withOperationField;
// FileSystem
private final FileSystem fs;
// Total log files read - for metrics
@@ -114,7 +116,8 @@ public abstract class AbstractHoodieLogRecordScanner {
private float progress = 0.0f;
protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange) {
+ String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
+ int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -131,6 +134,7 @@ public abstract class AbstractHoodieLogRecordScanner {
this.fs = fs;
this.bufferSize = bufferSize;
this.instantRange = instantRange;
+ this.withOperationField = withOperationField;
}
/**
@@ -294,7 +298,7 @@ public abstract class AbstractHoodieLogRecordScanner {
private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
&& !logBlock.getLogBlockHeader().get(INSTANT_TIME)
- .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
+ .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
}
/**
@@ -312,9 +316,9 @@ public abstract class AbstractHoodieLogRecordScanner {
protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
if (!simpleKeyGenFields.isPresent()) {
- return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
+ return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.withOperationField);
} else {
- return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get());
+ return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get(), this.withOperationField);
}
}
@@ -392,6 +396,10 @@ public abstract class AbstractHoodieLogRecordScanner {
return totalCorruptBlocks.get();
}
+ public boolean isWithOperationField() {
+ return withOperationField;
+ }
+
/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/
@@ -417,6 +425,10 @@ public abstract class AbstractHoodieLogRecordScanner {
throw new UnsupportedOperationException();
}
+ public Builder withOperationField(boolean withOperationField) {
+ throw new UnsupportedOperationException();
+ }
+
public abstract AbstractHoodieLogRecordScanner build();
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index 76d1cc3..d840565 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -45,8 +45,8 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
- ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get())
- : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass);
+ ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
+ : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, scanner.isWithOperationField());
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 4d51fbb..bc08fe7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -43,7 +43,7 @@ import java.util.Map;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
* be used as a lookup table when merging the base columnar file with the redo log file.
- *
+ * <p>
* NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
* This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of
* seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block
@@ -72,11 +72,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
@SuppressWarnings("unchecked")
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
- boolean reverseReader, int bufferSize, String spillableMapBasePath,
- Option<InstantRange> instantRange, boolean autoScan,
- ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
- super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange);
+ String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+ boolean reverseReader, int bufferSize, String spillableMapBasePath,
+ Option<InstantRange> instantRange, boolean autoScan,
+ ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
+ boolean withOperationField) {
+ super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -132,8 +133,10 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
if (records.containsKey(key)) {
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
// done when a delete (empty payload) is encountered before or after an insert/update.
+
+ // Always use the natural order now.
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
- records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
+ records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation()));
} else {
// Put the record as is
records.put(key, hoodieRecord);
@@ -177,6 +180,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
private Option<InstantRange> instantRange = Option.empty();
// auto scan default true
private boolean autoScan = true;
+ // operation field default false
+ private boolean withOperationField = false;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
@@ -248,12 +253,17 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
return this;
}
+ public Builder withOperationField(boolean withOperationField) {
+ this.withOperationField = withOperationField;
+ return this;
+ }
+
@Override
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan,
- diskMapType, isBitCaskDiskMapCompressionEnabled);
+ diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index dd0edd9..8b26f72 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -36,8 +36,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
private final LogRecordScannerCallback callback;
private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback) {
- super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, Option.empty());
+ String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize,
+ LogRecordScannerCallback callback, Option<InstantRange> instantRange) {
+ super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false);
this.callback = callback;
}
@@ -80,6 +81,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
+ private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
@@ -123,6 +125,11 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
return this;
}
+ public Builder withInstantRange(Option<InstantRange> instantRange) {
+ this.instantRange = instantRange;
+ return this;
+ }
+
public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
this.callback = callback;
return this;
@@ -131,7 +138,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
@Override
public HoodieUnMergedLogRecordScanner build() {
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
- latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback);
+ latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
index 43f8ba5..0a716e0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry;
@@ -32,6 +33,8 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.zip.CRC32;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString;
+
/**
* A utility class supports spillable map.
*/
@@ -110,18 +113,23 @@ public class SpillableMapUtils {
/**
* Utility method to convert bytes to HoodieRecord using schema and payload class.
*/
- public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) {
- return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+ public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, boolean withOperationField) {
+ return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField);
}
/**
* Utility method to convert bytes to HoodieRecord using schema and payload class.
*/
- public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, Pair<String, String> recordKeyPartitionPathPair) {
+ public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz,
+ Pair<String, String> recordKeyPartitionPathPair,
+ boolean withOperationField) {
String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString();
+ HoodieOperation operation = withOperationField
+ ? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath),
- ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class));
+ ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class), operation);
+
return (R) hoodieRecord;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 8995ab4..962b8ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -132,10 +132,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
HoodieTimer readTimer = new HoodieTimer().startTimer();
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
- hoodieRecord = tableConfig.populateMetaFields() ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
- tableConfig.getPayloadClass()) : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
- tableConfig.getPayloadClass(), Pair.of(tableConfig.getRecordKeyFieldProp(),
- tableConfig.getPartitionFieldProp()));
+ hoodieRecord = tableConfig.populateMetaFields()
+ ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), false)
+ : SpillableMapUtils.convertToHoodieRecordPayload(
+ baseRecord.get(),
+ tableConfig.getPayloadClass(),
+ Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
index 51b0315..a3c3e08 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
@@ -45,7 +45,7 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
String spillableMapBasePath, Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
- spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled);
+ spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled, false);
this.mergeKeyFilter = mergeKeyFilter;
performScan();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c0d721a..21fdb84 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -76,6 +76,17 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");
+ public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
+ .key("changelog.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to keep all the intermediate changes, "
+ + "we try to keep all the changes of a record when enabled:\n"
+ + "1). The sink accept the UPDATE_BEFORE message;\n"
+ + "2). The source try to emit every changes of a record.\n"
+ + "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n"
+ + " default false to have UPSERT semantics");
+
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 375265a..71b20ba 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -360,23 +361,26 @@ public class StreamWriteFunction<K, I, O>
private final String key; // record key
private final String instant; // 'U' or 'I'
private final HoodieRecordPayload<?> data; // record payload
+ private final HoodieOperation operation; // operation
- private DataItem(String key, String instant, HoodieRecordPayload<?> data) {
+ private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) {
this.key = key;
this.instant = instant;
this.data = data;
+ this.operation = operation;
}
public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
return new DataItem(
record.getRecordKey(),
record.getCurrentLocation().getInstantTime(),
- record.getData());
+ record.getData(),
+ record.getOperation());
}
public HoodieRecord<?> toHoodieRecord(String partitionPath) {
HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
- HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data);
+ HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data, operation);
HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
record.setCurrentLocation(loc);
return record;
@@ -417,7 +421,7 @@ public class StreamWriteFunction<K, I, O>
public void preWrite(List<HoodieRecord> records) {
// rewrite the first record with expected fileID
HoodieRecord<?> first = records.get(0);
- HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData());
+ HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData(), first.getOperation());
HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
record.setCurrentLocation(newLoc);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index bcf6c3a..4923362 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -203,7 +203,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
+ HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 0e3ecb1..fbe7678 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -71,7 +71,7 @@ public class BulkInsertWriterHelper {
this.taskPartitionId = taskPartitionId;
this.taskId = taskId;
this.taskEpochId = taskEpochId;
- this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+ this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType);
@@ -141,7 +141,7 @@ public class BulkInsertWriterHelper {
/**
* Adds the Hoodie metadata fields to the given row type.
*/
- private static RowType addMetadataFields(RowType rowType) {
+ private static RowType addMetadataFields(RowType rowType, boolean withOperationField) {
List<RowType.RowField> mergedFields = new ArrayList<>();
LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
@@ -161,6 +161,13 @@ public class BulkInsertWriterHelper {
mergedFields.add(recordKeyField);
mergedFields.add(partitionPathField);
mergedFields.add(fileNameField);
+
+ if (withOperationField) {
+ RowType.RowField operationField =
+ new RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType, "operation");
+ mergedFields.add(operationField);
+ }
+
mergedFields.addAll(rowType.getFields());
return new RowType(false, mergedFields);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
index ed916c8..fe55089 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
@@ -57,26 +55,14 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
/**
* Compaction instant time.
*/
- private String compactionInstantTime;
-
- /**
- * Hoodie flink table.
- */
- private HoodieFlinkTable<?> table;
+ private final String compactionInstantTime;
/**
* The compaction plan.
*/
- private HoodieCompactionPlan compactionPlan;
-
- /**
- * Hoodie instant.
- */
- private HoodieInstant instant;
+ private final HoodieCompactionPlan compactionPlan;
- public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
- this.table = table;
- this.instant = instant;
+ public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 84b6813..18d49f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -133,9 +133,8 @@ public class HoodieFlinkCompactor {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
- table.getMetaClient().reloadActiveTimeline();
- env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
+ env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index f575713..b600a5d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink.transform;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.configuration.FlinkOptions;
@@ -34,7 +35,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import java.io.IOException;
@@ -108,9 +108,9 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord>
private HoodieRecord toHoodieRecord(I record) throws Exception {
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
- // nullify the payload insert data to mark the record as a DELETE
- final boolean isDelete = record.getRowKind() == RowKind.DELETE;
- HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
- return new HoodieRecord<>(hoodieKey, payload);
+
+ HoodieRecordPayload payload = payloadCreation.createPayload(gr);
+ HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
+ return new HoodieRecord<>(hoodieKey, payload, operation);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 12eb039..192feb5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -86,6 +86,7 @@ public class HiveSyncContext {
hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
+ hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
return hiveSyncConfig;
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
index 831da25..20070b3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -71,13 +71,12 @@ public class PayloadCreation implements Serializable {
return new PayloadCreation(shouldCombine, constructor, preCombineField);
}
- public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
+ public HoodieRecordPayload<?> createPayload(GenericRecord record) throws Exception {
if (shouldCombine) {
ValidationUtils.checkState(preCombineField != null);
Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
preCombineField, false);
- return (HoodieRecordPayload<?>) constructor.newInstance(
- isDelete ? null : record, orderingVal);
+ return (HoodieRecordPayload<?>) constructor.newInstance(record, orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 49cd9e7..8a16c6d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -34,6 +34,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;
@@ -52,7 +53,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import java.util.Map;
@@ -185,12 +185,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
- // ignore RowKind.UPDATE_BEFORE
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.DELETE)
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .build();
+ if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+ return ChangelogModes.FULL;
+ } else {
+ return ChangelogModes.UPSERT;
+ }
}
@Override
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index af2327d..51d3c8d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -196,7 +197,12 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
- return ChangelogMode.insertOnly();
+ return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+ && !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
+ ? ChangelogModes.FULL
+ // when all the changes are persisted or read as batch,
+ // use INSERT mode.
+ : ChangelogMode.insertOnly();
}
@Override
@@ -309,7 +315,7 @@ public class HoodieTableSource implements
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
- TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+ TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final Schema tableAvroSchema;
try {
tableAvroSchema = schemaUtil.getTableAvroSchema();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index cbf1ea7..14f7eb3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -19,19 +19,32 @@
package org.apache.hudi.table.format;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +58,39 @@ public class FormatUtils {
private FormatUtils() {
}
+ /**
+ * Sets up the row kind to the row data {@code rowData} from the resolved operation.
+ */
+ public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
+ if (index == -1) {
+ return;
+ }
+ rowData.setRowKind(getRowKind(record, index));
+ }
+
+ /**
+ * Returns the RowKind of the given record, never null.
+ * Returns RowKind.INSERT when the given field value not found.
+ */
+ private static RowKind getRowKind(IndexedRecord record, int index) {
+ Object val = record.get(index);
+ if (val == null) {
+ return RowKind.INSERT;
+ }
+ final HoodieOperation operation = HoodieOperation.fromName(val.toString());
+ if (HoodieOperation.isInsert(operation)) {
+ return RowKind.INSERT;
+ } else if (HoodieOperation.isUpdateBefore(operation)) {
+ return RowKind.UPDATE_BEFORE;
+ } else if (HoodieOperation.isUpdateAfter(operation)) {
+ return RowKind.UPDATE_AFTER;
+ } else if (HoodieOperation.isDelete(operation)) {
+ return RowKind.DELETE;
+ } else {
+ throw new AssertionError();
+ }
+ }
+
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
Schema requiredSchema,
@@ -57,10 +103,11 @@ public class FormatUtils {
return recordBuilder.build();
}
- public static HoodieMergedLogRecordScanner scanLog(
+ public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
- Configuration config) {
+ Configuration config,
+ boolean withOperationField) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
@@ -81,10 +128,88 @@ public class FormatUtils {
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withInstantRange(split.getInstantRange())
+ .withOperationField(withOperationField)
+ .build();
+ }
+
+ private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
+ MergeOnReadInputSplit split,
+ Schema logSchema,
+ Configuration config,
+ HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
+ FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+ return HoodieUnMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(split.getTablePath())
+ .withLogFilePaths(split.getLogPaths().get())
+ .withReaderSchema(logSchema)
+ .withLatestInstantTime(split.getLatestCommit())
+ .withReadBlocksLazily(
+ string2Boolean(
+ config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+ HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReverseReader(false)
+ .withBufferSize(
+ config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+ HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withInstantRange(split.getInstantRange())
+ .withLogRecordScannerCallback(callback)
.build();
}
- public static HoodieMergedLogRecordScanner scanLog(
+ /**
+ * Utility to read and buffer the records in the unMerged log record scanner.
+ */
+ public static class BoundedMemoryRecords {
+ // Log Record unmerged scanner
+ private final HoodieUnMergedLogRecordScanner scanner;
+
+ // Executor that runs the above producers in parallel
+ private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?> executor;
+
+ // Iterator for the buffer consumer
+ private final Iterator<HoodieRecord<?>> iterator;
+
+ public BoundedMemoryRecords(
+ MergeOnReadInputSplit split,
+ Schema logSchema,
+ Configuration hadoopConf) {
+ this.executor = new BoundedInMemoryExecutor<>(
+ HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)),
+ getParallelProducers(),
+ Option.empty(),
+ x -> x,
+ new DefaultSizeEstimator<>());
+ // Consumer of this record reader
+ this.iterator = this.executor.getQueue().iterator();
+ this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
+ record -> executor.getQueue().insertRecord(record));
+ // Start reading and buffering
+ this.executor.startProducers();
+ }
+
+ public Iterator<HoodieRecord<?>> getRecordsIterator() {
+ return this.iterator;
+ }
+
+ /**
+ * Setup log and parquet reading in parallel. Both write to central buffer.
+ */
+ private List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> getParallelProducers() {
+ List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> producers = new ArrayList<>();
+ producers.add(new FunctionBasedQueueProducer<>(buffer -> {
+ scanner.scan();
+ return null;
+ }));
+ return producers;
+ }
+
+ public void close() {
+ this.executor.shutdownNow();
+ }
+ }
+
+ public static HoodieMergedLogRecordScanner logScanner(
List<String> logPaths,
Schema logSchema,
String latestInstantTime,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4d68242..d0e1c33d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format.mor;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
@@ -176,7 +177,11 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
- this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+ if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+ this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+ } else {
+ this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+ }
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
this.iterator = new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
@@ -190,6 +195,9 @@ public class MergeOnReadInputFormat
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
+ this.emitDelete,
+ this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
+ this.tableState.getOperationPos(),
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
@@ -298,7 +306,7 @@ public class MergeOnReadInputFormat
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
- final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+ final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
@@ -335,18 +343,20 @@ public class MergeOnReadInputFormat
}
delete.setRowKind(RowKind.DELETE);
- this.currentRecord = delete;
+ this.currentRecord = delete;
return true;
}
// skipping if the condition is unsatisfied
// continue;
} else {
+ final IndexedRecord avroRecord = curAvroRecord.get();
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- curAvroRecord.get(),
+ avroRecord,
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
return true;
}
}
@@ -365,6 +375,55 @@ public class MergeOnReadInputFormat
};
}
+ private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+ final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+ final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
+ final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
+ final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
+ AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+ final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf);
+ final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
+
+ return new ClosableIterator<RowData>() {
+ private RowData currentRecord;
+
+ @Override
+ public boolean hasNext() {
+ while (recordsIterator.hasNext()) {
+ Option<IndexedRecord> curAvroRecord = null;
+ final HoodieRecord<?> hoodieRecord = recordsIterator.next();
+ try {
+ curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
+ } catch (IOException e) {
+ throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), e);
+ }
+ if (curAvroRecord.isPresent()) {
+ final IndexedRecord avroRecord = curAvroRecord.get();
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ avroRecord,
+ requiredSchema,
+ requiredPos,
+ recordBuilder);
+ currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() {
+ records.close();
+ }
+ };
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -544,6 +603,8 @@ public class MergeOnReadInputFormat
private final Schema tableSchema;
private final Schema requiredSchema;
private final int[] requiredPos;
+ private final boolean emitDelete;
+ private final int operationPos;
private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private final GenericRecordBuilder recordBuilder;
@@ -557,7 +618,7 @@ public class MergeOnReadInputFormat
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
- private Set<String> keyToSkip = new HashSet<>();
+ private final Set<String> keyToSkip = new HashSet<>();
private RowData currentRecord;
@@ -569,13 +630,18 @@ public class MergeOnReadInputFormat
Schema tableSchema,
Schema requiredSchema,
int[] requiredPos,
+ boolean emitDelete,
+ boolean withOperationField,
+ int operationPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
- this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+ this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;
+ this.emitDelete = emitDelete;
+ this.operationPos = operationPos;
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
@@ -602,12 +668,13 @@ public class MergeOnReadInputFormat
// deleted
continue;
} else {
- GenericRecord record = buildAvroRecordBySchema(
+ GenericRecord avroRecord = buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
- this.currentRecord = (RowData) avroToRowDataConverter.convert(record);
+ this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
+ FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos);
return false;
}
}
@@ -620,16 +687,16 @@ public class MergeOnReadInputFormat
while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
if (!keyToSkip.contains(curKey)) {
- Option<IndexedRecord> insertAvroRecord =
- scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
+ Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ GenericRecord avroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
- this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
+ FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos);
return false;
}
}
@@ -637,6 +704,14 @@ public class MergeOnReadInputFormat
return true;
}
+ private Option<IndexedRecord> getInsertValue(String curKey) throws IOException {
+ final HoodieRecord<?> record = scanner.getRecords().get(curKey);
+ if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
+ return Option.empty();
+ }
+ return record.getData().getInsertValue(tableSchema);
+ }
+
@Override
public RowData nextRecord() {
return currentRecord;
@@ -655,8 +730,12 @@ public class MergeOnReadInputFormat
private Option<IndexedRecord> mergeRowWithLog(
RowData curRow,
String curKey) throws IOException {
+ final HoodieRecord<?> record = scanner.getRecords().get(curKey);
+ if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
+ return Option.empty();
+ }
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
- return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+ return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 9a32af6..0a63c91 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.format.mor;
+import org.apache.hudi.common.model.HoodieRecord;
+
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -38,6 +40,7 @@ public class MergeOnReadTableState implements Serializable {
private final String requiredAvroSchema;
private final List<MergeOnReadInputSplit> inputSplits;
private final String[] pkFields;
+ private final int operationPos;
public MergeOnReadTableState(
RowType rowType,
@@ -52,6 +55,7 @@ public class MergeOnReadTableState implements Serializable {
this.requiredAvroSchema = requiredAvroSchema;
this.inputSplits = inputSplits;
this.pkFields = pkFields;
+ this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
}
public RowType getRowType() {
@@ -74,6 +78,10 @@ public class MergeOnReadTableState implements Serializable {
return inputSplits;
}
+ public int getOperationPos() {
+ return operationPos;
+ }
+
public int[] getRequiredPositions() {
final List<String> fieldNames = rowType.getFieldNames();
return requiredRowType.getFieldNames().stream()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java
new file mode 100644
index 0000000..164815b
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Utilities for all kinds of common {@link org.apache.flink.table.connector.ChangelogMode}s.
+ */
+public class ChangelogModes {
+ public static final ChangelogMode FULL = ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+
+ /**
+ * Change log mode that ignores UPDATE_BEFORE, e.g UPSERT.
+ */
+ public static final ChangelogMode UPSERT = ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+
+ private ChangelogModes() {
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 86de90e..06305e2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -186,6 +186,7 @@ public class StreamerUtil {
.build())
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false)
+ .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
builder = builder.withSchema(getSourceSchema(conf).toString());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 6d802fe..ade43c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -38,9 +38,9 @@ import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
-import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
@@ -85,6 +85,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* Integration test for Flink Hoodie stream sink.
*/
@@ -200,7 +202,9 @@ public class StreamWriteITCase extends TestLogger {
// To compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
- writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+ boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The compaction plan should be scheduled");
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// generate compaction plan
@@ -209,8 +213,10 @@ public class StreamWriteITCase extends TestLogger {
table.getMetaClient(), compactionInstantTime);
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ // Mark instant as compaction inflight
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
- env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
+ env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d5639c5..8e7f809 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -399,33 +399,29 @@ public class TestWriteCopyOnWrite {
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
- Map<String, String> expected = new HashMap<>();
- // id3, id5 were deleted and id9 is ignored
- expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
- expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
- expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
- expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+ Map<String, String> expected = getUpsertWithDeleteExpected();
checkWrittenData(tempFile, expected);
}
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
- // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
+ assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
+ is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -472,22 +468,23 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
- // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
+ assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
+ is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -612,12 +609,13 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
+ conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
- // each record is 208 bytes. so 4 records expect to trigger buffer flush:
+ // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+ // so 3 records expect to trigger a mini-batch write
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
@@ -625,9 +623,9 @@ public class TestWriteCopyOnWrite {
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
- assertThat("2 records expect to flush out as a mini-batch",
+ assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
- is(2));
+ is(3));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
@@ -676,8 +674,17 @@ public class TestWriteCopyOnWrite {
// the last 2 lines are merged
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,1,par1]");
+ + "id1,par1,id1,Danny,23,1,par1" + "]");
+ return expected;
+ }
+
+ protected Map<String, String> getUpsertWithDeleteExpected() {
+ Map<String, String> expected = new HashMap<>();
+ // id3, id5 were deleted and id9 is ignored
+ expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+ expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+ expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
return expected;
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index 0d445d6..32ee725 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -47,7 +47,7 @@ public class TestRowDataKeyGen {
assertThat(keyGen1.getPartitionPath(rowData1), is("par1"));
// null record key and partition path
- final RowData rowData2 = insertRow(null, StringData.fromString("Danny"), 23,
+ final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
assertThat(keyGen1.getPartitionPath(rowData2), is("default"));
@@ -77,7 +77,7 @@ public class TestRowDataKeyGen {
assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001"));
// null record key and partition path
- final RowData rowData2 = insertRow(null, null, 23, null, null);
+ final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
// empty record key and partition path
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 957b9b8..f8dc018 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -261,6 +261,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
// write one commit
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -276,17 +277,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2");
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit);
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
- List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
- final String expected = "["
- + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
- + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
- + "id3,null,null,null,null, "
- + "id5,null,null,null,null, "
- + "id9,null,null,null,null]";
- assertRowsEquals(result, expected);
+ final String sinkDDL = "create table sink(\n"
+ + " name varchar(20),\n"
+ + " age_sum int\n"
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ + ")";
+ List<Row> result = execSelectSql(streamTableEnv,
+ "select name, sum(age) from t1 group by name", sinkDDL, 10);
+ final String expected = "[+I(Danny,24), +I(Stephen,34)]";
+ assertRowsEquals(result, expected, true);
}
@ParameterizedTest
@@ -724,6 +728,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
} else {
sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
}
+ return execSelectSql(tEnv, select, sinkDDL, timeout);
+ }
+
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
+ throws InterruptedException {
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
@@ -731,7 +740,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
tableResult.getJobClient().ifPresent(JobClient::cancel);
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index c8885b0..5535945 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieTableSource;
+import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -60,9 +61,14 @@ public class TestInputFormat {
File tempFile;
void beforeEach(HoodieTableType tableType) throws IOException {
+ beforeEach(tableType, Collections.emptyMap());
+ }
+
+ void beforeEach(HoodieTableType tableType, Map<String, String> options) throws IOException {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
+ options.forEach((key, value) -> conf.setString(key, value));
StreamerUtil.initTableIfNotExists(conf);
this.tableSource = new HoodieTableSource(
@@ -163,8 +169,62 @@ public class TestInputFormat {
}
@Test
- void testReadWithDeletes() throws Exception {
- beforeEach(HoodieTableType.MERGE_ON_READ);
+ void testReadBaseAndLogFilesWithDeletes() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write base first with compaction.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ // write another commit using logs and read again.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ // when isEmitDelete is false.
+ List<RowData> result1 = readData(inputFormat);
+
+ final String actual1 = TestData.rowDataToString(result1, true);
+ final String expected1 = "["
+ + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+ + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+ + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+ + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]";
+ assertThat(actual1, is(expected1));
+
+ // refresh the input format and set isEmitDelete to true.
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat();
+ ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+ List<RowData> result2 = readData(inputFormat);
+
+ final String actual2 = TestData.rowDataToString(result2, true);
+ final String expected2 = "["
+ + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+ + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+ + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+ + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+ + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+ + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), "
+ + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+ assertThat(actual2, is(expected2));
+ }
+
+ @Test
+ void testReadWithDeletesMOR() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
@@ -175,13 +235,32 @@ public class TestInputFormat {
List<RowData> result = readData(inputFormat);
- final String actual = TestData.rowDataToString(result);
+ final String actual = TestData.rowDataToString(result, true);
final String expected = "["
- + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
- + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
- + "id3,null,null,null,null, "
- + "id5,null,null,null,null, "
- + "id9,null,null,null,null]";
+ + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+ + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+ + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+ + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+ assertThat(actual, is(expected));
+ }
+
+ @Test
+ void testReadWithDeletesCOW() throws Exception {
+ beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+ List<RowData> result = readData(inputFormat);
+
+ final String actual = TestData.rowDataToString(result, true);
+ final String expected = "["
+ + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+ + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]";
assertThat(actual, is(expected));
}
@@ -205,6 +284,33 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
+ @Test
+ void testReadChangesUnMergedMOR() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> result = readData(inputFormat);
+
+ final String actual = TestData.rowDataToString(result, true);
+ final String expected = "["
+ + "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+ + "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+ + "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+ + "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+ + "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+ + "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+ + "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), "
+ + "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]";
+ assertThat(actual, is(expected));
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index f5ac9c5..d3e32e6 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -234,15 +234,53 @@ public class TestData {
TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
);
+ public static List<RowData> DATA_SET_INSERT_UPDATE_DELETE = Arrays.asList(
+ // INSERT
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ // UPDATE
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+ // DELETE
+ deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
+ );
+
/**
* Returns string format of a list of RowData.
*/
public static String rowDataToString(List<RowData> rows) {
+ return rowDataToString(rows, false);
+ }
+
+ /**
+ * Returns string format of a list of RowData.
+ *
+ * @param withChangeFlag whether to print the change flag
+ */
+ public static String rowDataToString(List<RowData> rows, boolean withChangeFlag) {
DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
return rows.stream()
- .map(row -> converter.toExternal(row).toString())
- .sorted(Comparator.naturalOrder())
+ .sorted(Comparator.comparing(o -> toStringSafely(o.getString(0))))
+ .map(row -> {
+ final String rowStr = converter.toExternal(row).toString();
+ if (withChangeFlag) {
+ return row.getRowKind().shortString() + "(" + rowStr + ")";
+ } else {
+ return rowStr;
+ }
+ })
.collect(Collectors.toList()).toString();
}
@@ -287,7 +325,30 @@ public class TestData {
* @param expected Expected string of the sorted rows
*/
public static void assertRowsEquals(List<Row> rows, String expected) {
- assertRowsEquals(rows, expected, 0);
+ assertRowsEquals(rows, expected, false);
+ }
+
+ /**
+ * Sort the {@code rows} using field at index 0 and asserts
+ * it equals with the expected string {@code expected}.
+ *
+ * @param rows Actual result rows
+ * @param expected Expected string of the sorted rows
+ * @param withChangeFlag Whether compares with change flags
+ */
+ public static void assertRowsEquals(List<Row> rows, String expected, boolean withChangeFlag) {
+ String rowsString = rows.stream()
+ .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
+ .map(row -> {
+ final String rowStr = row.toString();
+ if (withChangeFlag) {
+ return row.getKind().shortString() + "(" + rowStr + ")";
+ } else {
+ return rowStr;
+ }
+ })
+ .collect(Collectors.toList()).toString();
+ assertThat(rowsString, is(expected));
}
/**
@@ -573,7 +634,11 @@ public class TestData {
}
public static BinaryRowData insertRow(Object... fields) {
- LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+ return insertRow(TestConfigurations.ROW_TYPE, fields);
+ }
+
+ public static BinaryRowData insertRow(RowType rowType, Object... fields) {
+ LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
assertEquals(
"Filed count inconsistent with type information",
@@ -599,4 +664,16 @@ public class TestData {
rowData.setRowKind(RowKind.DELETE);
return rowData;
}
+
+ private static BinaryRowData updateBeforeRow(Object... fields) {
+ BinaryRowData rowData = insertRow(fields);
+ rowData.setRowKind(RowKind.UPDATE_BEFORE);
+ return rowData;
+ }
+
+ private static BinaryRowData updateAfterRow(Object... fields) {
+ BinaryRowData rowData = insertRow(fields);
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ return rowData;
+ }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java
new file mode 100644
index 0000000..7729042
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieRowData}.
+ */
+public class TestHoodieRowData {
+ private final int metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.size();
+ private static final Random RANDOM = new Random();
+ private static final int INTEGER_INDEX = 0;
+ private static final int STRING_INDEX = 1;
+ private static final int BOOLEAN_INDEX = 2;
+ private static final int SHORT_INDEX = 3;
+ private static final int BYTE_INDEX = 4;
+ private static final int LONG_INDEX = 5;
+ private static final int FLOAT_INDEX = 6;
+ private static final int DOUBLE_INDEX = 7;
+ private static final int DECIMAL_INDEX = 8;
+ private static final int BINARY_INDEX = 9;
+ private static final int ROW_INDEX = 10;
+
+ private static final DataType BASIC_DATA_TYPE = DataTypes.ROW(
+ DataTypes.FIELD("integer", DataTypes.INT()),
+ DataTypes.FIELD("string", DataTypes.STRING()),
+ DataTypes.FIELD("boolean", DataTypes.BOOLEAN()),
+ DataTypes.FIELD("short", DataTypes.SMALLINT()),
+ DataTypes.FIELD("byte", DataTypes.TINYINT()),
+ DataTypes.FIELD("long", DataTypes.BIGINT()),
+ DataTypes.FIELD("float", DataTypes.FLOAT()),
+ DataTypes.FIELD("double", DataTypes.DOUBLE()),
+ DataTypes.FIELD("decimal", DataTypes.DECIMAL(10, 4)),
+ DataTypes.FIELD("binary", DataTypes.BYTES()),
+ DataTypes.FIELD("row", DataTypes.ROW()))
+ .notNull();
+ private static final RowType ROW_TYPE = (RowType) BASIC_DATA_TYPE.getLogicalType();
+
+ @Test
+ public void testGet() {
+ Object[] values = getRandomValue(true);
+ RowData rowData = TestData.insertRow(ROW_TYPE, values);
+
+ HoodieRowData hoodieRowData = new HoodieRowData("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName",
+ rowData, true);
+ assertValues(hoodieRowData, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
+ "fileName", values);
+ }
+
+ /**
+ * Fetches a random Object[] of values for testing.
+ *
+ * @param haveRowType true if rowType need to be added as one of the elements in the Object[]
+ * @return the random Object[] thus generated
+ */
+ private Object[] getRandomValue(boolean haveRowType) {
+ Object[] values = new Object[11];
+ values[INTEGER_INDEX] = RANDOM.nextInt();
+ values[STRING_INDEX] = StringData.fromString(UUID.randomUUID().toString());
+ values[BOOLEAN_INDEX] = RANDOM.nextBoolean();
+ values[SHORT_INDEX] = (short) RANDOM.nextInt(2);
+ byte[] bytes = new byte[1];
+ RANDOM.nextBytes(bytes);
+ values[BYTE_INDEX] = bytes[0];
+ values[LONG_INDEX] = RANDOM.nextLong();
+ values[FLOAT_INDEX] = RANDOM.nextFloat();
+ values[DOUBLE_INDEX] = RANDOM.nextDouble();
+ values[DECIMAL_INDEX] = DecimalData.fromBigDecimal(new BigDecimal("1005.12313"), 10, 4);
+ bytes = new byte[20];
+ RANDOM.nextBytes(bytes);
+ values[BINARY_INDEX] = bytes;
+ if (haveRowType) {
+ Object[] rowField = getRandomValue(false);
+ values[ROW_INDEX] = TestData.insertRow(ROW_TYPE, rowField);
+ }
+ return values;
+ }
+
+ private void assertValues(HoodieRowData hoodieRowData, String commitTime, String commitSeqNo, String recordKey, String partitionPath,
+ String filename, Object[] values) {
+ assertEquals(commitTime, hoodieRowData.getString(0).toString());
+ assertEquals(commitSeqNo, hoodieRowData.getString(1).toString());
+ assertEquals(recordKey, hoodieRowData.getString(2).toString());
+ assertEquals(partitionPath, hoodieRowData.getString(3).toString());
+ assertEquals(filename, hoodieRowData.getString(4).toString());
+ assertEquals("I", hoodieRowData.getString(5).toString());
+ // row data.
+ assertEquals(values[INTEGER_INDEX], hoodieRowData.getInt(INTEGER_INDEX + metaColumnsNum));
+ assertEquals(values[STRING_INDEX], hoodieRowData.getString(STRING_INDEX + metaColumnsNum));
+ assertEquals(values[BOOLEAN_INDEX], hoodieRowData.getBoolean(BOOLEAN_INDEX + metaColumnsNum));
+ assertEquals(values[SHORT_INDEX], hoodieRowData.getShort(SHORT_INDEX + metaColumnsNum));
+ assertEquals(values[BYTE_INDEX], hoodieRowData.getByte(BYTE_INDEX + metaColumnsNum));
+ assertEquals(values[LONG_INDEX], hoodieRowData.getLong(LONG_INDEX + metaColumnsNum));
+ assertEquals(values[FLOAT_INDEX], hoodieRowData.getFloat(FLOAT_INDEX + metaColumnsNum));
+ assertEquals(values[DOUBLE_INDEX], hoodieRowData.getDouble(DOUBLE_INDEX + metaColumnsNum));
+ assertEquals(values[DECIMAL_INDEX], hoodieRowData.getDecimal(DECIMAL_INDEX + metaColumnsNum, 10, 4));
+ byte[] exceptBinary = (byte[]) values[BINARY_INDEX];
+ byte[] binary = hoodieRowData.getBinary(BINARY_INDEX + metaColumnsNum);
+ for (int i = 0; i < exceptBinary.length; i++) {
+ assertEquals(exceptBinary[i], binary[i]);
+ }
+ assertEquals(values[ROW_INDEX], hoodieRowData.getRow(ROW_INDEX + metaColumnsNum, values.length));
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index 6dc9325..33e9d37 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -150,6 +150,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
public void invoke(RowData value, SinkFunction.Context context) {
Row row = (Row) converter.toExternal(value);
assert row != null;
+ row.setKind(value.getRowKind());
RESULT.get(taskID).add(row);
}
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index cd84c3b..6af0119 100644
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -71,7 +71,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
- syncConfig.verifyMetadataFileListing, fs);
+ syncConfig.verifyMetadataFileListing, false, fs);
this.dlaConfig = syncConfig;
try {
this.partitionValueExtractor =
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 560563c..d9d833d 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
public int sparkSchemaLengthThreshold = 4000;
+ @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
+ public Boolean withOperationField = false;
+
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -143,6 +146,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.batchSyncNum = cfg.batchSyncNum;
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
+ newConfig.withOperationField = cfg.withOperationField;
return newConfig;
}
@@ -174,6 +178,7 @@ public class HiveSyncConfig implements Serializable {
+ ", createManagedTable=" + createManagedTable
+ ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ + ", withOperationField=" + withOperationField
+ '}';
}
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 4a22bb8..13e48f5 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -62,7 +62,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
private final HiveSyncConfig syncConfig;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
- super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
+ super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, cfg.withOperationField, fs);
this.syncConfig = cfg;
// Support JDBC, HiveQL and metastore based implementations for backwards compatiblity. Future users should
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 1107d74..11ff745 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -51,19 +51,21 @@ public abstract class AbstractSyncHoodieClient {
protected final HoodieTableMetaClient metaClient;
protected final HoodieTableType tableType;
protected final FileSystem fs;
- private String basePath;
- private boolean assumeDatePartitioning;
- private boolean useFileListingFromMetadata;
- private boolean verifyMetadataFileListing;
+ private final String basePath;
+ private final boolean assumeDatePartitioning;
+ private final boolean useFileListingFromMetadata;
+ private final boolean verifyMetadataFileListing;
+ private final boolean withOperationField;
public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
- boolean verifyMetadataFileListing, FileSystem fs) {
+ boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.tableType = metaClient.getTableType();
this.basePath = basePath;
this.assumeDatePartitioning = assumeDatePartitioning;
this.useFileListingFromMetadata = useFileListingFromMetadata;
this.verifyMetadataFileListing = verifyMetadataFileListing;
+ this.withOperationField = withOperationField;
this.fs = fs;
}
@@ -139,7 +141,11 @@ public abstract class AbstractSyncHoodieClient {
*/
public MessageType getDataSchema() {
try {
- return new TableSchemaResolver(metaClient).getTableParquetSchema();
+ if (withOperationField) {
+ return new TableSchemaResolver(metaClient, true).getTableParquetSchema();
+ } else {
+ return new TableSchemaResolver(metaClient).getTableParquetSchema();
+ }
} catch (Exception e) {
throw new HoodieSyncException("Failed to read data schema", e);
}