You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/07 13:20:18 UTC

[hudi] branch master updated: [HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9729415aaef [HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)
9729415aaef is described below

commit 9729415aaef49ca1f15a93a601052dd1c94b2a81
Author: HunterXHunter <13...@qq.com>
AuthorDate: Fri Apr 7 21:20:06 2023 +0800

    [HUDI-6032] Fix read meta field '_hoodie_commit_time' multiple times (#8398)
---
 .../table/format/mor/MergeOnReadInputFormat.java   |  29 ++++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 142 +++++++++++++++++++++
 2 files changed, 168 insertions(+), 3 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 2a8b2e66878..767968629d1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -193,6 +193,7 @@ public class MergeOnReadInputFormat
         return new BaseFileOnlyFilteringIterator(
             split.getInstantRange().get(),
             this.tableState.getRequiredRowType(),
+            this.requiredPos,
             getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
       } else {
         // base file only
@@ -548,21 +549,31 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
+    private int commitTimePos;
+
     BaseFileOnlyFilteringIterator(
         InstantRange instantRange,
         RowType requiredRowType,
+        int[] requiredPos,
         ClosableIterator<RowData> nested) {
       this.nested = nested;
       this.instantRange = instantRange;
-      int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray();
-      projection = RowDataProjection.instance(requiredRowType, positions);
+      this.commitTimePos = getCommitTimePos(requiredPos);
+      int[] positions;
+      if (commitTimePos < 0) {
+        commitTimePos = 0;
+        positions = IntStream.range(1, 1 + requiredPos.length).toArray();
+      } else {
+        positions = IntStream.range(0, requiredPos.length).toArray();
+      }
+      this.projection = RowDataProjection.instance(requiredRowType, positions);
     }
 
     @Override
     public boolean hasNext() {
       while (this.nested.hasNext()) {
         currentRecord = this.nested.next();
-        boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+        boolean isInRange = instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
         if (isInRange) {
           return true;
         }
@@ -898,12 +909,24 @@ public class MergeOnReadInputFormat
   // -------------------------------------------------------------------------
 
   private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
+    if (getCommitTimePos(requiredPos) >= 0) {
+      return requiredPos;
+    }
     int[] requiredPos2 = new int[requiredPos.length + 1];
     requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
     System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
     return requiredPos2;
   }
 
+  private static int getCommitTimePos(int[] requiredPos) {
+    for (int i = 0; i < requiredPos.length; i++) {
+      if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
   @VisibleForTesting
   public void isEmitDelete(boolean emitDelete) {
     this.emitDelete = emitDelete;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 849210e9bee..838be82e62a 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1763,6 +1763,135 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected);
   }
 
+  @ParameterizedTest
+  @MethodSource("tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams")
+  void testReadMetaFields(HoodieTableType tableType, String queryType, int numInsertBatches, int compactionDeltaCommits) throws Exception {
+    String path = tempFile.getAbsolutePath();
+    String hoodieTableDDL = sql("t1")
+            .field("id int")
+            .field("name varchar(10)")
+            .field("ts timestamp(6)")
+            .field("`partition` varchar(10)")
+            .pkField("id")
+            .partitionField("partition")
+            .option(FlinkOptions.TABLE_TYPE, tableType)
+            .option(FlinkOptions.QUERY_TYPE, queryType)
+            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+            .option(FlinkOptions.PATH, path)
+            .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    final String[] insertInto = new String[] {
+        "insert into t1 values(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001', 'par1')",
+        "insert into t1 values(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002', 'par2')",
+        "insert into t1 values(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003', 'par3')"};
+
+    // Queries without meta fields.
+    String[] template1 = new String[] {
+        "+I[1, Danny, 2021-12-01T01:02:01.100001, par1]",
+        ", +I[2, Stephen, 2021-12-02T03:04:02.200002, par2]",
+        ", +I[3, Julian, 2021-12-03T13:14:03.300003, par3]"
+    };
+
+    // Meta field '_hoodie_commit_time' in the first position.
+    String[] template2 = new String[] {
+        "+I[%s, 1, par1, 1, Danny, 2021-12-01T01:02:01.100001, par1]",
+        ", +I[%s, 2, par2, 2, Stephen, 2021-12-02T03:04:02.200002, par2]",
+        ", +I[%s, 3, par3, 3, Julian, 2021-12-03T13:14:03.300003, par3]"
+    };
+
+    // Meta fields at random positions.
+    String[] template3 = new String[] {
+        "+I[1, %s, Danny, 1, 2021-12-01T01:02:01.100001, par1, par1]",
+        ", +I[2, %s, Stephen, 2, 2021-12-02T03:04:02.200002, par2, par2]",
+        ", +I[3, %s, Julian, 3, 2021-12-03T13:14:03.300003, par3, par3]"
+    };
+
+    StringBuilder expected1 = new StringBuilder();
+    StringBuilder expected2 = new StringBuilder();
+    StringBuilder expected3 = new StringBuilder();
+
+    expected1.append("[");
+    expected2.append("[");
+    expected3.append("[");
+    for (int i = 0; i < numInsertBatches; i++) {
+      execInsertSql(batchTableEnv, insertInto[i]);
+      String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ)
+              ? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path);
+      expected1.append(template1[i]);
+      expected2.append(String.format(template2[i], commitTime));
+      expected3.append(String.format(template3[i], commitTime));
+    }
+    expected1.append("]");
+    expected2.append("]");
+    expected3.append("]");
+    String readHoodieTableDDL;
+    batchTableEnv.executeSql("drop table t1");
+    readHoodieTableDDL = sql("t1")
+            .field("id int")
+            .field("name varchar(10)")
+            .field("ts timestamp(6)")
+            .field("`partition` varchar(10)")
+            .pkField("id")
+            .partitionField("partition")
+            .option(FlinkOptions.TABLE_TYPE, tableType)
+            .option(FlinkOptions.QUERY_TYPE, queryType)
+            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+            .option(FlinkOptions.PATH, path)
+            .end();
+    batchTableEnv.executeSql(readHoodieTableDDL);
+
+    List<Row> result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+    assertRowsEquals(result, expected1.toString());
+
+    batchTableEnv.executeSql("drop table t1");
+    readHoodieTableDDL = sql("t1")
+            .field("_hoodie_commit_time string")
+            .field("_hoodie_record_key string")
+            .field("_hoodie_partition_path string")
+            .field("id int")
+            .field("name varchar(10)")
+            .field("ts timestamp(6)")
+            .field("`partition` varchar(10)")
+            .pkField("id")
+            .partitionField("partition")
+            .option(FlinkOptions.TABLE_TYPE, tableType)
+            .option(FlinkOptions.QUERY_TYPE, queryType)
+            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+            .option(FlinkOptions.PATH, path)
+            .end();
+    batchTableEnv.executeSql(readHoodieTableDDL);
+
+    result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+    assertRowsEquals(result, expected2.toString());
+
+    batchTableEnv.executeSql("drop table t1");
+    readHoodieTableDDL = sql("t1")
+            .field("id int")
+            .field("_hoodie_commit_time string")
+            .field("name varchar(10)")
+            .field("_hoodie_record_key string")
+            .field("ts timestamp(6)")
+            .field("_hoodie_partition_path string")
+            .field("`partition` varchar(10)")
+            .pkField("id")
+            .partitionField("partition")
+            .option(FlinkOptions.TABLE_TYPE, tableType)
+            .option(FlinkOptions.QUERY_TYPE, queryType)
+            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+            .option(FlinkOptions.PATH, path)
+            .end();
+    batchTableEnv.executeSql(readHoodieTableDDL);
+
+    result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+    assertRowsEquals(result, expected3.toString());
+
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeAndPartitioningParams")
   void testDynamicPartitionPrune(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception {
@@ -1839,6 +1968,19 @@ public class ITTestHoodieDataSource {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (HoodieTableType, query type, num insert batches, num compaction delta commits).
+   */
+  private static Stream<Arguments> tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
+    return Arrays.stream(new Object[][] {
+            {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
+            {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
+            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1},
+            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3},
+            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
+    }).map(Arguments::of);
+  }
+
   /**
    * Return test params => (index type, hive style partitioning).
    */