You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/06/12 14:42:24 UTC
[hudi] branch master updated: [HUDI-6358] Fix flink payload merger with deletes (#8935)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e9a67bfc497 [HUDI-6358] Fix flink payload merger with deletes (#8935)
e9a67bfc497 is described below
commit e9a67bfc49759637ec680c27179e25c01cae10b7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon Jun 12 22:42:17 2023 +0800
[HUDI-6358] Fix flink payload merger with deletes (#8935)
---
.../apache/hudi/table/format/mor/MergeOnReadInputFormat.java | 11 +++++------
.../java/org/apache/hudi/table/format/TestInputFormat.java | 8 ++++++++
.../src/test/java/org/apache/hudi/utils/TestData.java | 4 ++++
3 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 767968629d1..86240fa5982 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -73,7 +73,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -767,7 +766,7 @@ public class MergeOnReadInputFormat
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
if (scanner.getRecords().containsKey(curKey)) {
keyToSkip.add(curKey);
- Option<HoodieAvroIndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
+ Option<HoodieRecord<IndexedRecord>> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
if (!mergedAvroRecord.isPresent()) {
// deleted
continue;
@@ -838,13 +837,13 @@ public class MergeOnReadInputFormat
}
}
- private Option<HoodieAvroIndexedRecord> mergeRowWithLog(RowData curRow, String curKey) {
- final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
+ @SuppressWarnings("unchecked")
+ private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(RowData curRow, String curKey) {
+ final HoodieRecord<?> record = scanner.getRecords().get(curKey);
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord);
try {
- Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
- return resultRecord.get().toIndexedRecord(tableSchema, new Properties());
+ return recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
} catch (IOException e) {
throw new HoodieIOException("Merge base and delta payloads exception", e);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index d1b5516d1ad..7c076344166 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -952,6 +952,14 @@ public class TestInputFormat {
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
final String baseMergeLogFileResult = TestData.rowDataToString(readData(inputFormat));
assertThat(baseMergeLogFileResult, is(expected));
+
+ // write another commit with delete messages
+ TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, conf);
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+ final String baseMergeLogFileResult2 = TestData.rowDataToString(readData(inputFormat));
+ assertThat(baseMergeLogFileResult2, is("[]"));
}
@Test
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7199ba069fc..7f9ad108941 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -330,6 +330,10 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ public static List<RowData> DATA_SET_SINGLE_DELETE = Collections.singletonList(
+ deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
+
public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),