You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/07 13:20:18 UTC
[hudi] branch master updated: [HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9729415aaef [HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)
9729415aaef is described below
commit 9729415aaef49ca1f15a93a601052dd1c94b2a81
Author: HunterXHunter <13...@qq.com>
AuthorDate: Fri Apr 7 21:20:06 2023 +0800
[HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)
---
.../table/format/mor/MergeOnReadInputFormat.java | 29 ++++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 142 +++++++++++++++++++++
2 files changed, 168 insertions(+), 3 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 2a8b2e66878..767968629d1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -193,6 +193,7 @@ public class MergeOnReadInputFormat
return new BaseFileOnlyFilteringIterator(
split.getInstantRange().get(),
this.tableState.getRequiredRowType(),
+ this.requiredPos,
getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
} else {
// base file only
@@ -548,21 +549,31 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
+ private int commitTimePos;
+
BaseFileOnlyFilteringIterator(
InstantRange instantRange,
RowType requiredRowType,
+ int[] requiredPos,
ClosableIterator<RowData> nested) {
this.nested = nested;
this.instantRange = instantRange;
- int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray();
- projection = RowDataProjection.instance(requiredRowType, positions);
+ this.commitTimePos = getCommitTimePos(requiredPos);
+ int[] positions;
+ if (commitTimePos < 0) {
+ commitTimePos = 0;
+ positions = IntStream.range(1, 1 + requiredPos.length).toArray();
+ } else {
+ positions = IntStream.range(0, requiredPos.length).toArray();
+ }
+ this.projection = RowDataProjection.instance(requiredRowType, positions);
}
@Override
public boolean hasNext() {
while (this.nested.hasNext()) {
currentRecord = this.nested.next();
- boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+ boolean isInRange = instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
if (isInRange) {
return true;
}
@@ -898,12 +909,24 @@ public class MergeOnReadInputFormat
// -------------------------------------------------------------------------
private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
+ if (getCommitTimePos(requiredPos) >= 0) {
+ return requiredPos;
+ }
int[] requiredPos2 = new int[requiredPos.length + 1];
requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
return requiredPos2;
}
+ private static int getCommitTimePos(int[] requiredPos) {
+ for (int i = 0; i < requiredPos.length; i++) {
+ if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
@VisibleForTesting
public void isEmitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 849210e9bee..838be82e62a 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1763,6 +1763,135 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, expected);
}
+ @ParameterizedTest
+ @MethodSource("tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams")
+ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInsertBatches, int compactionDeltaCommits) throws Exception {
+ String path = tempFile.getAbsolutePath();
+ String hoodieTableDDL = sql("t1")
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ final String[] insertInto = new String[] {
+ "insert into t1 values(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001', 'par1')",
+ "insert into t1 values(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002', 'par2')",
+ "insert into t1 values(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003', 'par3')"};
+
+ // Queries without meta fields.
+ String[] template1 = new String[] {
+ "+I[1, Danny, 2021-12-01T01:02:01.100001, par1]",
+ ", +I[2, Stephen, 2021-12-02T03:04:02.200002, par2]",
+ ", +I[3, Julian, 2021-12-03T13:14:03.300003, par3]"
+ };
+
+ // Meta field '_hoodie_commit_time' in the first position.
+ String[] template2 = new String[] {
+ "+I[%s, 1, par1, 1, Danny, 2021-12-01T01:02:01.100001, par1]",
+ ", +I[%s, 2, par2, 2, Stephen, 2021-12-02T03:04:02.200002, par2]",
+ ", +I[%s, 3, par3, 3, Julian, 2021-12-03T13:14:03.300003, par3]"
+ };
+
+ // Meta fields at random positions.
+ String[] template3 = new String[] {
+ "+I[1, %s, Danny, 1, 2021-12-01T01:02:01.100001, par1, par1]",
+ ", +I[2, %s, Stephen, 2, 2021-12-02T03:04:02.200002, par2, par2]",
+ ", +I[3, %s, Julian, 3, 2021-12-03T13:14:03.300003, par3, par3]"
+ };
+
+ StringBuilder expected1 = new StringBuilder();
+ StringBuilder expected2 = new StringBuilder();
+ StringBuilder expected3 = new StringBuilder();
+
+ expected1.append("[");
+ expected2.append("[");
+ expected3.append("[");
+ for (int i = 0; i < numInsertBatches; i++) {
+ execInsertSql(batchTableEnv, insertInto[i]);
+ String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ)
+ ? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path);
+ expected1.append(template1[i]);
+ expected2.append(String.format(template2[i], commitTime));
+ expected3.append(String.format(template3[i], commitTime));
+ }
+ expected1.append("]");
+ expected2.append("]");
+ expected3.append("]");
+ String readHoodieTableDDL;
+ batchTableEnv.executeSql("drop table t1");
+ readHoodieTableDDL = sql("t1")
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
+ batchTableEnv.executeSql(readHoodieTableDDL);
+
+ List<Row> result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+ assertRowsEquals(result, expected1.toString());
+
+ batchTableEnv.executeSql("drop table t1");
+ readHoodieTableDDL = sql("t1")
+ .field("_hoodie_commit_time string")
+ .field("_hoodie_record_key string")
+ .field("_hoodie_partition_path string")
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
+ batchTableEnv.executeSql(readHoodieTableDDL);
+
+ result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+ assertRowsEquals(result, expected2.toString());
+
+ batchTableEnv.executeSql("drop table t1");
+ readHoodieTableDDL = sql("t1")
+ .field("id int")
+ .field("_hoodie_commit_time string")
+ .field("name varchar(10)")
+ .field("_hoodie_record_key string")
+ .field("ts timestamp(6)")
+ .field("_hoodie_partition_path string")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
+ batchTableEnv.executeSql(readHoodieTableDDL);
+
+ result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+ assertRowsEquals(result, expected3.toString());
+
+ }
+
@ParameterizedTest
@MethodSource("tableTypeAndPartitioningParams")
void testDynamicPartitionPrune(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception {
@@ -1839,6 +1968,19 @@ public class ITTestHoodieDataSource {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (HoodieTableType, query type, num insert batches, num compaction delta commits).
+ */
+ private static Stream<Arguments> tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
+ return Arrays.stream(new Object[][] {
+ {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
+ {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
+ }).map(Arguments::of);
+ }
+
/**
* Return test params => (index type, hive style partitioning).
*/