You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/06/12 14:42:24 UTC

[hudi] branch master updated: [HUDI-6358] Fix flink payload merger with deletes (#8935)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a67bfc497 [HUDI-6358] Fix flink payload merger with deletes (#8935)
e9a67bfc497 is described below

commit e9a67bfc49759637ec680c27179e25c01cae10b7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon Jun 12 22:42:17 2023 +0800

    [HUDI-6358] Fix flink payload merger with deletes (#8935)
---
 .../apache/hudi/table/format/mor/MergeOnReadInputFormat.java  | 11 +++++------
 .../java/org/apache/hudi/table/format/TestInputFormat.java    |  8 ++++++++
 .../src/test/java/org/apache/hudi/utils/TestData.java         |  4 ++++
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 767968629d1..86240fa5982 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -73,7 +73,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.IntStream;
@@ -767,7 +766,7 @@ public class MergeOnReadInputFormat
         final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
         if (scanner.getRecords().containsKey(curKey)) {
           keyToSkip.add(curKey);
-          Option<HoodieAvroIndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
+          Option<HoodieRecord<IndexedRecord>> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
           if (!mergedAvroRecord.isPresent()) {
             // deleted
             continue;
@@ -838,13 +837,13 @@ public class MergeOnReadInputFormat
       }
     }
 
-    private Option<HoodieAvroIndexedRecord> mergeRowWithLog(RowData curRow, String curKey) {
-      final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
+    @SuppressWarnings("unchecked")
+    private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(RowData curRow, String curKey) {
+      final HoodieRecord<?> record = scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
       HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord);
       try {
-        Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
-        return resultRecord.get().toIndexedRecord(tableSchema, new Properties());
+        return recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
       } catch (IOException e) {
         throw new HoodieIOException("Merge base and delta payloads exception", e);
       }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index d1b5516d1ad..7c076344166 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -952,6 +952,14 @@ public class TestInputFormat {
     assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
     final String baseMergeLogFileResult = TestData.rowDataToString(readData(inputFormat));
     assertThat(baseMergeLogFileResult, is(expected));
+
+    // write another commit with delete messages
+    TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, conf);
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    final String baseMergeLogFileResult2 = TestData.rowDataToString(readData(inputFormat));
+    assertThat(baseMergeLogFileResult2, is("[]"));
   }
 
   @Test
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7199ba069fc..7f9ad108941 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -330,6 +330,10 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
 
+  public static List<RowData> DATA_SET_SINGLE_DELETE = Collections.singletonList(
+      deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
+
   public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
           TimestampData.fromEpochMillis(3), StringData.fromString("par1")),