You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/20 23:21:47 UTC
[hudi] branch master updated: [HUDI-2340] Merge the data set for
flink bounded source when changelog mode turns off (#3513)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c7c517f [HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)
c7c517f is described below
commit c7c517f14c3fb5260fa7f3abca5d50a299870b38
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Aug 21 07:21:35 2021 +0800
[HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)
---
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../org/apache/hudi/table/format/FormatUtils.java | 11 ++++++++
.../table/format/mor/MergeOnReadInputFormat.java | 10 +++++--
.../apache/hudi/table/format/TestInputFormat.java | 33 ++++++++++++++++++++++
4 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 91afd49..1d58e4e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -197,7 +197,7 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
- return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+ return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
? ChangelogModes.FULL
// when all the changes are persisted or read as batch,
// use INSERT mode.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 14f7eb3..94fbe02 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -91,6 +91,17 @@ public class FormatUtils {
}
}
+ /**
+ * Returns the RowKind of the given record, never null.
+ * Returns RowKind.INSERT when the given field value not found.
+ */
+ public static RowKind getRowKindSafely(IndexedRecord record, int index) {
+ if (index == -1) {
+ return RowKind.INSERT;
+ }
+ return getRowKind(record, index);
+ }
+
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
Schema requiredSchema,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index d0e1c33d..2042b96 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -177,7 +177,8 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
- if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+ if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+ && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
@@ -350,13 +351,18 @@ public class MergeOnReadInputFormat
// continue;
} else {
final IndexedRecord avroRecord = curAvroRecord.get();
+ final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos());
+ if (rowKind == RowKind.DELETE && !emitDelete) {
+ // skip the delete record
+ continue;
+ }
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
avroRecord,
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
- FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
+ currentRecord.setRowKind(rowKind);
return true;
}
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index cc0699f..f83b2d9 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -326,9 +326,41 @@ public class TestInputFormat {
}
@Test
+ void testReadChangesMergedMOR() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> result1 = readData(inputFormat);
+
+ final String actual1 = TestData.rowDataToString(result1);
+ // the data set is merged when the data source is bounded.
+ final String expected1 = "[]";
+ assertThat(actual1, is(expected1));
+
+ // refresh the input format and set isEmitDelete to true.
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat();
+ ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+ List<RowData> result2 = readData(inputFormat);
+
+ final String actual2 = TestData.rowDataToString(result2);
+ final String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
+ assertThat(actual2, is(expected2));
+ }
+
+ @Test
void testReadChangesUnMergedMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+ options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
@@ -340,6 +372,7 @@ public class TestInputFormat {
List<RowData> result = readData(inputFormat);
final String actual = TestData.rowDataToString(result);
+ // the data set is merged when the data source is bounded.
final String expected = "["
+ "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "
+ "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "