You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/11 14:55:57 UTC
[hudi] branch master updated: [HUDI-2298] The
HoodieMergedLogRecordScanner should set up the operation of the chosen
record (#3456)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2933249 [HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)
2933249 is described below
commit 29332498af420596f42c3b0ff4a2fe804d7d7dfc
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Aug 11 22:55:43 2021 +0800
[HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)
---
.../table/log/HoodieMergedLogRecordScanner.java | 10 ++++--
.../apache/hudi/table/format/TestInputFormat.java | 37 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 24 ++++++++++++++
3 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index bc08fe7..a68c8f1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -134,9 +135,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
// done when a delete (empty payload) is encountered before or after an insert/update.
- // Always use the natural order now.
- HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
- records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation()));
+ HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+ HoodieRecordPayload oldValue = oldRecord.getData();
+ HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
+ boolean choosePrev = combinedValue.equals(oldValue);
+ HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation();
+ records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
} else {
// Put the record as is
records.put(key, hoodieRecord);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5535945..54848bd 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -221,6 +221,43 @@ public class TestInputFormat {
}
@Test
+ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write base first with compaction.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
+
+ // write another commit using logs and read again.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ // when isEmitDelete is false.
+ List<RowData> result1 = readData(inputFormat);
+
+ final String actual1 = TestData.rowDataToString(result1, true);
+ final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+ assertThat(actual1, is(expected1));
+
+ // refresh the input format and set isEmitDelete to true.
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat();
+ ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+ List<RowData> result2 = readData(inputFormat);
+
+ final String actual2 = TestData.rowDataToString(result2, true);
+ final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+ assertThat(actual2, is(expected2));
+ }
+
+ @Test
void testReadWithDeletesMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d3e32e6..b51631c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -58,6 +58,7 @@ import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -256,6 +257,29 @@ public class TestData {
TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
);
+ public static List<RowData> DATA_SET_SINGLE_INSERT = Collections.singletonList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
+ public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
+ // DISORDER UPDATE
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ // DISORDER DELETE
+ deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
+ );
+
/**
* Returns string format of a list of RowData.
*/