You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/11 14:55:57 UTC

[hudi] branch master updated: [HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2933249  [HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)
2933249 is described below

commit 29332498af420596f42c3b0ff4a2fe804d7d7dfc
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Aug 11 22:55:43 2021 +0800

    [HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)
---
 .../table/log/HoodieMergedLogRecordScanner.java    | 10 ++++--
 .../apache/hudi/table/format/TestInputFormat.java  | 37 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  | 24 ++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index bc08fe7..a68c8f1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -134,9 +135,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
       // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
       // done when a delete (empty payload) is encountered before or after an insert/update.
 
-      // Always use the natural order now.
-      HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
-      records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation()));
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      HoodieRecordPayload oldValue = oldRecord.getData();
+      HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
+      boolean choosePrev = combinedValue.equals(oldValue);
+      HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation();
+      records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
     } else {
       // Put the record as is
       records.put(key, hoodieRecord);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5535945..54848bd 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -221,6 +221,43 @@ public class TestInputFormat {
   }
 
   @Test
+  void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write base first with compaction.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
+
+    // write another commit using logs and read again.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    // when isEmitDelete is false.
+    List<RowData> result1 = readData(inputFormat);
+
+    final String actual1 = TestData.rowDataToString(result1, true);
+    final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+    assertThat(actual1, is(expected1));
+
+    // refresh the input format and set isEmitDelete to true.
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result2 = readData(inputFormat);
+
+    final String actual2 = TestData.rowDataToString(result2, true);
+    final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+    assertThat(actual2, is(expected2));
+  }
+
+  @Test
   void testReadWithDeletesMOR() throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d3e32e6..b51631c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -58,6 +58,7 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -256,6 +257,29 @@ public class TestData {
           TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
   );
 
+  public static List<RowData> DATA_SET_SINGLE_INSERT = Collections.singletonList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
+  public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
+      // DISORDER UPDATE
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      // DISORDER DELETE
+      deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
+  );
+
   /**
    * Returns string format of a list of RowData.
    */