You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/23 10:36:59 UTC
[hudi] branch master updated: [HUDI-4897] Refactor the merge handle in CDC mode (#6740)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new efe553b327 [HUDI-4897] Refactor the merge handle in CDC mode (#6740)
efe553b327 is described below
commit efe553b327bc025d242afa37221a740dca9b1ea6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Sep 23 18:36:48 2022 +0800
[HUDI-4897] Refactor the merge handle in CDC mode (#6740)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 ++
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 148 ++++++++++++---------
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 56 ++------
.../apache/hudi/io/HoodieMergeHandleFactory.java | 101 ++++++++++++++
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 97 ++++++++++++++
.../apache/hudi/io/HoodieSortedMergeHandle.java | 23 +---
.../io/HoodieSortedMergeHandleWithChangeLog.java | 59 ++++++++
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 11 ++
.../hudi/table/action/clean/CleanPlanner.java | 22 +--
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 11 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 11 +-
.../commit/BaseJavaCommitActionExecutor.java | 20 +--
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 11 +-
.../commit/BaseSparkCommitActionExecutor.java | 14 +-
.../hudi/common/table/cdc/HoodieCDCOperation.java | 3 +
.../cdc/HoodieCDCSupplementalLoggingMode.java | 12 ++
.../hudi/common/table/cdc/HoodieCDCUtils.java | 9 +-
.../common/table/log/block/HoodieCDCDataBlock.java | 4 +-
18 files changed, 420 insertions(+), 201 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7f805410ea..032d790d63 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -2160,6 +2161,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return metastoreConfig.enableMetastore();
}
+ /**
+ * CDC supplemental logging mode.
+ */
+ public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
+ return HoodieCDCSupplementalLoggingMode.parse(
+ getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index e4f1e14252..f57b195c76 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -18,14 +18,6 @@
package org.apache.hudi.io;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -48,6 +40,13 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -67,13 +66,9 @@ public class HoodieCDCLogger implements Closeable {
private final Schema dataSchema;
- private final boolean populateMetaFields;
-
// writer for cdc data
private final HoodieLogFormat.Writer cdcWriter;
- private final boolean cdcEnabled;
-
private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
private final Schema cdcSchema;
@@ -83,6 +78,9 @@ public class HoodieCDCLogger implements Closeable {
// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;
+ // the cdc record transformer
+ private final CDCTransformer transformer;
+
public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
@@ -93,15 +91,11 @@ public class HoodieCDCLogger implements Closeable {
try {
this.commitTime = commitTime;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
- this.populateMetaFields = config.populateMetaFields();
- this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+ this.keyField = config.populateMetaFields()
+ ? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.cdcWriter = cdcWriter;
-
- this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
- this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
- config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
-
+ this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode();
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
@@ -114,8 +108,8 @@ public class HoodieCDCLogger implements Closeable {
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>(),
config.getCommonConfig().getSpillableDiskMapType(),
- config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
- );
+ config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
+ this.transformer = getTransformer();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
}
@@ -124,55 +118,25 @@ public class HoodieCDCLogger implements Closeable {
public void put(HoodieRecord hoodieRecord,
GenericRecord oldRecord,
Option<IndexedRecord> newRecord) {
- if (cdcEnabled) {
- String recordKey = hoodieRecord.getRecordKey();
- GenericData.Record cdcRecord;
- if (newRecord.isPresent()) {
- GenericRecord record = (GenericRecord) newRecord.get();
- if (oldRecord == null) {
- // inserted cdc record
- cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
- null, record);
- } else {
- // updated cdc record
- cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
- oldRecord, record);
- }
+ String recordKey = hoodieRecord.getRecordKey();
+ GenericData.Record cdcRecord;
+ if (newRecord.isPresent()) {
+ GenericRecord record = (GenericRecord) newRecord.get();
+ if (oldRecord == null) {
+ // INSERT cdc record
+ cdcRecord = this.transformer.transform(HoodieCDCOperation.INSERT, recordKey,
+ null, record);
} else {
- // deleted cdc record
- cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
- oldRecord, null);
+ // UPDATE cdc record
+ cdcRecord = this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey,
+ oldRecord, record);
}
- cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
- }
- }
-
- private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
- String recordKey,
- GenericRecord oldRecord,
- GenericRecord newRecord) {
- GenericData.Record record;
- if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
- record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime,
- removeCommitMetadata(oldRecord), newRecord);
- } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
- record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey,
- removeCommitMetadata(oldRecord));
} else {
- record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
- }
- return record;
- }
-
- private GenericRecord removeCommitMetadata(GenericRecord record) {
- if (record == null) {
- return null;
+ // DELETE cdc record
+ cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
+ oldRecord, null);
}
- return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
- }
-
- public boolean isEmpty() {
- return !this.cdcEnabled || this.cdcData.isEmpty();
+ cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
}
public Option<AppendResult> writeCDCData() {
@@ -219,6 +183,43 @@ public class HoodieCDCLogger implements Closeable {
}
}
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private CDCTransformer getTransformer() {
+ if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+ return (operation, recordKey, oldRecord, newRecord) ->
+ HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
+ } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
+ return (operation, recordKey, oldRecord, newRecord) ->
+ HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
+ } else {
+ return (operation, recordKey, oldRecord, newRecord) ->
+ HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
+ }
+ }
+
+ private GenericRecord removeCommitMetadata(GenericRecord record) {
+ return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap());
+ }
+
+ public boolean isEmpty() {
+ return this.cdcData.isEmpty();
+ }
+
+ public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
+ long recordsWritten,
+ long insertRecordsWritten) {
+ if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
+ // the following cases where we do not need to write out the cdc file:
+ // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
+ // case 2: all the incoming data is INSERT.
+ return Option.empty();
+ }
+ return cdcLogger.writeCDCData();
+ }
+
public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
@@ -236,4 +237,19 @@ public class HoodieCDCLogger implements Closeable {
throw new HoodieUpsertException("Failed to set cdc write stat", e);
}
}
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * A transformer that transforms normal data records into cdc records.
+ */
+ private interface CDCTransformer {
+ GenericData.Record transform(HoodieCDCOperation operation,
+ String recordKey,
+ GenericRecord oldRecord,
+ GenericRecord newRecord);
+
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 442256ade3..5515c2552e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,8 +18,6 @@
package org.apache.hudi.io;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -33,9 +31,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
@@ -55,7 +50,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -66,8 +61,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
@SuppressWarnings("Duplicates")
@@ -107,8 +102,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
- protected boolean cdcEnabled = false;
- protected HoodieCDCLogger cdcLogger;
private boolean preserveMetadata = false;
protected Path newFilePath;
@@ -210,18 +203,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);
-
- // init the cdc logger
- this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
- if (cdcEnabled) {
- this.cdcLogger = new HoodieCDCLogger(
- instantTime,
- config,
- hoodieTable.getMetaClient().getTableConfig(),
- tableSchema,
- createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
- IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
- }
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -287,7 +268,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}
- private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
boolean isDelete = false;
if (indexedRecord.isPresent()) {
updatedRecordsWritten++;
@@ -300,11 +281,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
return false;
}
}
- boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
- if (result && cdcEnabled) {
- cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
- }
- return result;
+ return writeRecord(hoodieRecord, indexedRecord, isDelete);
}
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -314,10 +291,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
+ writeInsertRecord(hoodieRecord, insertRecord);
+ }
+
+ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
- if (cdcEnabled) {
- cdcLogger.put(hoodieRecord, null, insertRecord);
- }
insertRecordsWritten++;
}
}
@@ -425,21 +403,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
}
}
- private Option<AppendResult> writeCDCDataIfNeeded() {
- if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
- // the following cases where we do not need to write out the cdc file:
- // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
- // case 2: all the data are new-coming,
- return Option.empty();
- }
- return cdcLogger.writeCDCData();
- }
-
@Override
public List<WriteStatus> close() {
try {
- HoodieWriteStat stat = writeStatus.getStat();
-
writeIncomingRecords();
if (keyToNewRecords instanceof ExternalSpillableMap) {
@@ -454,11 +420,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
fileWriter = null;
}
- // if there are cdc data written, set the CDC-related information.
- Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
- HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);
-
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
+ HoodieWriteStat stat = writeStatus.getStat();
+
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setNumWrites(recordsWritten);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
new file mode 100644
index 0000000000..436eff5dac
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Factory class for hoodie merge handle.
+ */
+public class HoodieMergeHandleFactory {
+ /**
+ * Creates a merge handle for normal write path.
+ */
+ public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+ WriteOperationType operationType,
+ HoodieWriteConfig writeConfig,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId,
+ TaskContextSupplier taskContextSupplier,
+ Option<BaseKeyGenerator> keyGeneratorOpt) {
+ if (table.requireSortedRecords()) {
+ if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
+ keyGeneratorOpt);
+ } else {
+ return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
+ keyGeneratorOpt);
+ }
+ } else if (!WriteOperationType.isChangingRecords(operationType) && writeConfig.allowDuplicateInserts()) {
+ return new HoodieConcatHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+ } else {
+ if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+ } else {
+ return new HoodieMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+ }
+ }
+ }
+
+ /**
+ * Creates a merge handle for compaction path.
+ */
+ public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+ HoodieWriteConfig writeConfig,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Map<String, HoodieRecord<T>> keyToNewRecords,
+ String partitionPath,
+ String fileId,
+ HoodieBaseFile dataFileToBeMerged,
+ TaskContextSupplier taskContextSupplier,
+ Option<BaseKeyGenerator> keyGeneratorOpt) {
+ if (table.requireSortedRecords()) {
+ if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ } else {
+ return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ }
+ } else {
+ if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ } else {
+ return new HoodieMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ }
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
new file mode 100644
index 0000000000..12e48ffbb4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A merge handle that supports logging change logs.
+ */
+public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+ protected final HoodieCDCLogger cdcLogger;
+
+ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+ this.cdcLogger = new HoodieCDCLogger(
+ instantTime,
+ config,
+ hoodieTable.getMetaClient().getTableConfig(),
+ tableSchema,
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
+ }
+
+ /**
+ * Called by compactor code path.
+ */
+ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ this.cdcLogger = new HoodieCDCLogger(
+ instantTime,
+ config,
+ hoodieTable.getMetaClient().getTableConfig(),
+ tableSchema,
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
+ }
+
+ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+ final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+ if (result) {
+ cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+ }
+ return result;
+ }
+
+ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
+ super.writeInsertRecord(hoodieRecord, insertRecord);
+ cdcLogger.put(hoodieRecord, null, insertRecord);
+ }
+
+ @Override
+ public List<WriteStatus> close() {
+ List<WriteStatus> writeStatuses = super.close();
+ // if there are cdc data written, set the CDC-related information.
+ Option<AppendResult> cdcResult =
+ HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
+ HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
+ return writeStatuses;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 8240de66d5..7dce31a4c3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -18,9 +18,6 @@
package org.apache.hudi.io;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -33,6 +30,8 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
+import org.apache.avro.generic.GenericRecord;
+
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -94,18 +93,13 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
- Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
- insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
} else {
- insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
}
- writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
- if (cdcEnabled) {
- cdcLogger.put(hoodieRecord, null, insertRecord);
- }
} catch (IOException e) {
throw new HoodieUpsertException("Failed to write records", e);
}
@@ -122,17 +116,12 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
String key = newRecordKeysSorted.poll();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
- Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
- insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
} else {
- insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
}
- writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
- if (cdcEnabled) {
- cdcLogger.put(hoodieRecord, null, insertRecord);
- }
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
new file mode 100644
index 0000000000..8d317b709a
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A sorted merge handle that supports logging change logs.
+ */
+public class HoodieSortedMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandleWithChangeLog<T, I, K, O> {
+ public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+ }
+
+ /**
+ * Called by compactor code path.
+ */
+ public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ }
+
+ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
+ final boolean result = super.writeRecord(hoodieRecord, insertRecord);
+ this.cdcLogger.put(hoodieRecord, null, insertRecord);
+ return result;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 3864c31ce9..abf5c0face 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -309,6 +310,16 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
+ protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String fileSuffix) {
+ try {
+ return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
+ } catch (IOException e) {
+ throw new HoodieException("Creating logger writer with fileId: " + fileId + ", "
+ + "base commit time: " + baseCommitTime + ", "
+ + "file suffix: " + fileSuffix + " error");
+ }
+ }
+
private static class IgnoreRecord implements GenericRecord {
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 5ed53f7ae0..e5d90b5e9d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -369,22 +369,12 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
}
});
- if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
- // If merge on read, then clean the log files for the commits as well
- Predicate<HoodieLogFile> notCDCLogFile =
- hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
- deletePaths.addAll(
- aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
- .collect(Collectors.toList()));
- }
- if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
- // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
- // Here we need to clean uo these cdc log files.
- Predicate<HoodieLogFile> isCDCLogFile =
- hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
- deletePaths.addAll(
- aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
- .collect(Collectors.toList()));
+ if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ
+ || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+ // 1. If merge on read, then clean the log files for the commits as well;
+ // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow.
+ deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+ .collect(Collectors.toList()));
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 2c8a3c4e49..7d2be6cb93 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -46,7 +46,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
@@ -401,13 +401,8 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
- if (requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
- } else {
- return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
- }
+ return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 8892133498..8e72682725 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -45,7 +45,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -299,13 +299,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
- if (requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, Option.empty());
- } else {
- return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, Option.empty());
- }
+ return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, Option.empty());
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 22c90eb8bb..7762fd5ea3 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -41,8 +40,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
-import org.apache.hudi.io.HoodieConcatHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
@@ -291,20 +289,8 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
- if (table.requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
- } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
- return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
- } else {
- return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
- }
- }
-
- protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
- Map<String, HoodieRecord<T>> keyToNewRecords,
- HoodieBaseFile dataFileToBeMerged) {
- return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
- partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty());
+ return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId,
+ taskContextSupplier, Option.empty());
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index a88ca65c35..115aea06f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -48,7 +48,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -250,13 +250,8 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
- if (requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
- } else {
- return new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
- }
+ return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index af72c14efc..8c7d9e41ea 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -45,12 +45,10 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.CreateHandleFactory;
-import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
@@ -385,14 +383,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
- if (table.requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier,
- keyGeneratorOpt);
- } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
- return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
- } else {
- return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
- }
+ return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId,
+ taskContextSupplier, keyGeneratorOpt);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
index edd63f3569..90540bc05a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
@@ -20,6 +20,9 @@ package org.apache.hudi.common.table.cdc;
import org.apache.hudi.exception.HoodieNotSupportedException;
+/**
+ * Enumeration of change log operation.
+ */
public enum HoodieCDCOperation {
INSERT("i"),
UPDATE("u"),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index b1e92dd273..35a232206f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -20,6 +20,18 @@ package org.apache.hudi.common.table.cdc;
import org.apache.hudi.exception.HoodieNotSupportedException;
+/**
+ * Change log capture supplemental logging mode. The supplemental log is used for
+ * accelerating the generation of change log details.
+ *
+ * <p>Three modes are supported:</p>
+ *
+ * <ul>
+ * <li>OP_KEY: record keys, the reader needs to figure out the update before image and after image;</li>
+ * <li>OP_KEY: before images, the reader needs to figure out the update after images;</li>
+ * <li>OP_KEY: before and after images, the reader can generate the details directly from the log.</li>
+ * </ul>
+ */
public enum HoodieCDCSupplementalLoggingMode {
OP_KEY("cdc_op_key"),
WITH_BEFORE("cdc_data_before"),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index a741181d4d..042e95cfd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -18,17 +18,20 @@
package org.apache.hudi.common.table.cdc;
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.exception.HoodieException;
+
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.exception.HoodieException;
-
import java.util.Arrays;
import java.util.List;
+/**
+ * Utilities for change log capture.
+ */
public class HoodieCDCUtils {
public static final String CDC_LOGFILE_SUFFIX = "-cdc";
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 1c0f6e4b6c..cc5663262c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -29,6 +29,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * Change log supplemental log data block.
+ */
public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
public HoodieCDCDataBlock(
@@ -53,5 +56,4 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.CDC_DATA_BLOCK;
}
-
}