You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/20 23:21:47 UTC

[hudi] branch master updated: [HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7c517f  [HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)
c7c517f is described below

commit c7c517f14c3fb5260fa7f3abca5d50a299870b38
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Aug 21 07:21:35 2021 +0800

    [HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)
---
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../org/apache/hudi/table/format/FormatUtils.java  | 11 ++++++++
 .../table/format/mor/MergeOnReadInputFormat.java   | 10 +++++--
 .../apache/hudi/table/format/TestInputFormat.java  | 33 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 91afd49..1d58e4e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -197,7 +197,7 @@ public class HoodieTableSource implements
 
   @Override
   public ChangelogMode getChangelogMode() {
-    return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+    return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
         ? ChangelogModes.FULL
         // when all the changes are persisted or read as batch,
         // use INSERT mode.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 14f7eb3..94fbe02 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -91,6 +91,17 @@ public class FormatUtils {
     }
   }
 
+  /**
+   * Returns the RowKind of the given record, never null.
+   * Returns RowKind.INSERT when the given field value not found.
+   */
+  public static RowKind getRowKindSafely(IndexedRecord record, int index) {
+    if (index == -1) {
+      return RowKind.INSERT;
+    }
+    return getRowKind(record, index);
+  }
+
   public static GenericRecord buildAvroRecordBySchema(
       IndexedRecord record,
       Schema requiredSchema,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index d0e1c33d..2042b96 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -177,7 +177,8 @@ public class MergeOnReadInputFormat
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
-      if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+      if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+          && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
         this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
       } else {
         this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
@@ -350,13 +351,18 @@ public class MergeOnReadInputFormat
             // continue;
           } else {
             final IndexedRecord avroRecord = curAvroRecord.get();
+            final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos());
+            if (rowKind == RowKind.DELETE && !emitDelete) {
+              // skip the delete record
+              continue;
+            }
             GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
                 avroRecord,
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
             currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
-            FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
+            currentRecord.setRowKind(rowKind);
             return true;
           }
         }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index cc0699f..f83b2d9 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -326,9 +326,41 @@ public class TestInputFormat {
   }
 
   @Test
+  void testReadChangesMergedMOR() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> result1 = readData(inputFormat);
+
+    final String actual1 = TestData.rowDataToString(result1);
+    // the data set is merged when the data source is bounded.
+    final String expected1 = "[]";
+    assertThat(actual1, is(expected1));
+
+    // refresh the input format and set isEmitDelete to true.
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result2 = readData(inputFormat);
+
+    final String actual2 = TestData.rowDataToString(result2);
+    final String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
+    assertThat(actual2, is(expected2));
+  }
+
+  @Test
   void testReadChangesUnMergedMOR() throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
     beforeEach(HoodieTableType.MERGE_ON_READ, options);
 
     // write another commit to read again
@@ -340,6 +372,7 @@ public class TestInputFormat {
     List<RowData> result = readData(inputFormat);
 
     final String actual = TestData.rowDataToString(result);
+    // the data set is merged when the data source is bounded.
     final String expected = "["
         + "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "
         + "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "