You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/17 13:12:16 UTC

[hudi] branch master updated: [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bf871cfc3 [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)
64bf871cfc3 is described below

commit 64bf871cfc3cfc08478cf04e02d2f7086f72548e
Author: voonhous <vo...@gmail.com>
AuthorDate: Mon Apr 17 21:12:06 2023 +0800

    [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)
    
    Co-authored-by: hbgstc123 <hb...@gmail.com>
---
 .../row/HoodieRowDataParquetWriteSupport.java      |   2 +-
 .../storage/row/parquet/ParquetRowDataWriter.java  |  63 ++++++--
 .../row/parquet/ParquetSchemaConverter.java        |  10 +-
 .../row/parquet/TestParquetSchemaConverter.java    |   2 +-
 .../org/apache/hudi/util/AvroSchemaConverter.java  |   4 +-
 .../apache/hudi/util/AvroToRowDataConverters.java  |   2 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 180 +++++++++++++++++++++
 .../vector/reader/Int64TimestampColumnReader.java  |   2 +-
 .../vector/reader/Int64TimestampColumnReader.java  |   2 +-
 .../vector/reader/Int64TimestampColumnReader.java  |   2 +-
 .../vector/reader/Int64TimestampColumnReader.java  |   2 +-
 11 files changed, 244 insertions(+), 27 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index b939498c3e2..4a3109db60a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -20,11 +20,11 @@ package org.apache.hudi.io.storage.row;
 
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.Option;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.util.Option;
 import org.apache.parquet.hadoop.api.WriteSupport;
 
 import java.nio.charset.StandardCharsets;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index 3d9524eaa30..e5b9509d879 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.io.storage.row.parquet;
 
+import org.apache.hudi.common.util.ValidationUtils;
+
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.data.MapData;
@@ -124,17 +126,19 @@ public class ParquetRowDataWriter {
         return new DoubleWriter();
       case TIMESTAMP_WITHOUT_TIME_ZONE:
         TimestampType timestampType = (TimestampType) t;
-        if (timestampType.getPrecision() == 3) {
-          return new Timestamp64Writer();
+        final int tsPrecision = timestampType.getPrecision();
+        if (tsPrecision == 3 || tsPrecision == 6) {
+          return new Timestamp64Writer(tsPrecision);
         } else {
-          return new Timestamp96Writer(timestampType.getPrecision());
+          return new Timestamp96Writer(tsPrecision);
         }
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
-        if (localZonedTimestampType.getPrecision() == 3) {
-          return new Timestamp64Writer();
+        final int tsLtzPrecision = localZonedTimestampType.getPrecision();
+        if (tsLtzPrecision == 3 || tsLtzPrecision == 6) {
+          return new Timestamp64Writer(tsLtzPrecision);
         } else {
-          return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+          return new Timestamp96Writer(tsLtzPrecision);
         }
       case ARRAY:
         ArrayType arrayType = (ArrayType) t;
@@ -284,33 +288,64 @@ public class ParquetRowDataWriter {
   }
 
   /**
-   * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
+   * TIMESTAMP_MILLIS and TIMESTAMP_MICROS is the deprecated ConvertedType of TIMESTAMP with the MILLIS and MICROS
+   * precision respectively. See
    * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
-   * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
    */
   private class Timestamp64Writer implements FieldWriter {
-    private Timestamp64Writer() {
+    private final int precision;
+    private Timestamp64Writer(int precision) {
+      ValidationUtils.checkArgument(precision == 3 || precision == 6,
+          "Timestamp64Writer is only able to support precisions of {3, 6}");
+      this.precision = precision;
     }
 
     @Override
     public void write(RowData row, int ordinal) {
-      recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
+      TimestampData timestampData = row.getTimestamp(ordinal, precision);
+      recordConsumer.addLong(timestampToInt64(timestampData, precision));
     }
 
     @Override
     public void write(ArrayData array, int ordinal) {
-      recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+      TimestampData timestampData = array.getTimestamp(ordinal, precision);
+      recordConsumer.addLong(timestampToInt64(timestampData, precision));
     }
   }
 
-  private long timestampToInt64(TimestampData timestampData) {
-    return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
+  /**
+   * Converts a {@code TimestampData} to its corresponding int64 value. This function only accepts TimestampData of
+   * precision 3 or 6. Special attention will need to be given to a TimestampData of precision = 6.
+   * <p>
+   * For example representing `1970-01-01T00:00:03.100001` of precision 6 will have:
+   * <ul>
+   *   <li>millisecond = 3100</li>
+   *   <li>nanoOfMillisecond = 1000</li>
+   * </ul>
+   * As such, the int64 value will be:
+   * <p>
+   * millisecond * 1000 + nanoOfMillisecond / 1000
+   *
+   * @param timestampData TimestampData to be converted to int64 format
+   * @param precision the precision of the TimestampData
+   * @return int64 value of the TimestampData
+   */
+  private long timestampToInt64(TimestampData timestampData, int precision) {
+    if (precision == 3) {
+      return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
+    } else {
+      // using an else clause here as precision has been validated to be {3, 6} in the constructor
+      // convert timestampData to microseconds format
+      return utcTimestamp ? timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000 :
+          timestampData.toTimestamp().getTime() * 1000;
+    }
   }
 
   /**
    * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
    * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
-   * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
+   * <p>
+   * TODO: Leaving this here as there might be a requirement to support TIMESTAMP(9) in the future
    */
   private class Timestamp96Writer implements FieldWriter {
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5fb76f9418c..7cd7c300670 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -600,9 +600,10 @@ public class ParquetSchemaConverter {
             .named(name);
       case TIMESTAMP_WITHOUT_TIME_ZONE:
         TimestampType timestampType = (TimestampType) type;
-        if (timestampType.getPrecision() == 3) {
+        if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) {
+          TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
-              .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
+              .as(LogicalTypeAnnotation.timestampType(true, timeunit))
               .named(name);
         } else {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
@@ -610,9 +611,10 @@ public class ParquetSchemaConverter {
         }
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
-        if (localZonedTimestampType.getPrecision() == 3) {
+        if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) {
+          TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
-              .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
+              .as(LogicalTypeAnnotation.timestampType(false, timeunit))
               .named(name);
         } else {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index a1a07a65f99..3d5012b73b3 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -83,7 +83,7 @@ public class TestParquetSchemaConverter {
     assertThat(messageType.getColumns().size(), is(3));
     final String expected = "message converted {\n"
         + "  optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
-        + "  optional int96 ts_6;\n"
+        + "  optional int64 ts_6 (TIMESTAMP(MICROS,true));\n"
         + "  optional int96 ts_9;\n"
         + "}\n";
     assertThat(messageType.toString(), is(expected));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 44253e37329..6e4c3ec6137 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -256,7 +256,7 @@ public class AvroSchemaConverter {
           throw new IllegalArgumentException(
               "Avro does not support TIMESTAMP type with precision: "
                   + precision
-                  + ", it only supports precision less than 6.");
+                  + ", it only support precisions <= 6.");
         }
         Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
         return nullable ? nullableSchema(timestamp) : timestamp;
@@ -273,7 +273,7 @@ public class AvroSchemaConverter {
           throw new IllegalArgumentException(
               "Avro does not support LOCAL TIMESTAMP type with precision: "
                   + precision
-                  + ", it only supports precision less than 6.");
+                  + ", it only support precisions <= 6.");
         }
         Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
         return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 5c9988dc0b2..38633b8ad9e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -215,7 +215,7 @@ public class AvroToRowDataConverters {
       throw new IllegalArgumentException(
           "Avro does not support TIMESTAMP type with precision: "
               + precision
-              + ", it only supports precision less than 6.");
+              + ", it only support precisions <= 6.");
     }
     return avroObject -> {
       final Instant instant;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 9fdc6fc8b10..095fe1d369e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -52,8 +52,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -68,8 +71,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -419,4 +424,179 @@ public class ITTestHoodieFlinkClustering {
         .stream().anyMatch(fg -> fg.getSlices()
             .stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
   }
+
+  /**
+   * Test to ensure that creating a table with a column of TIMESTAMP(9) will throw errors
+   * @throws Exception
+   */
+  @Test
+  public void testHoodieFlinkClusteringWithTimestampNanos() {
+    // create hoodie table and insert into data
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    // row schema
+    final DataType dataType = DataTypes.ROW(
+            DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+            DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+            DataTypes.FIELD("age", DataTypes.INT()),
+            DataTypes.FIELD("ts", DataTypes.TIMESTAMP(9)), // precombine field
+            DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+        .notNull();
+
+    final RowType rowType = (RowType) dataType.getLogicalType();
+    final List<String> fields = rowType.getFields().stream()
+        .map(RowType.RowField::asSummaryString).collect(Collectors.toList());
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
+        "t1", fields, options, true, "uuid", "partition");
+    TableResult tableResult = tableEnv.executeSql(hoodieTableDDL);
+
+    // insert rows with timestamp of microseconds precision; timestamp(6)
+    final String insertSql = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')";
+
+    assertThrows(ValidationException.class, () -> tableEnv.executeSql(insertSql),
+        "Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6.");
+  }
+
+  @Test
+  public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception {
+    // create hoodie table and insert into data
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+
+    // row schema
+    final DataType dataType = DataTypes.ROW(
+            DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+            DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+            DataTypes.FIELD("age", DataTypes.INT()),
+            DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), // precombine field
+            DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+        .notNull();
+    final RowType rowType = (RowType) dataType.getLogicalType();
+    final List<String> fields = rowType.getFields().stream()
+        .map(RowType.RowField::asSummaryString).collect(Collectors.toList());
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
+        "t1", fields, options, true, "uuid", "partition");
+    tableEnv.executeSql(hoodieTableDDL);
+
+    // insert rows with timestamp of microseconds precision; timestamp(6)
+    final String insertSql = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')";
+    tableEnv.executeSql(insertSql).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), timeline.lastInstant().get());
+
+    HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+    // Mark instant as clustering inflight
+    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+
+    // test output
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]");
+    expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]");
+    TestData.checkWrittenData(tempFile, expected, 4);
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 555853bda6b..70638a9c432 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
       throw new IllegalArgumentException(
           "Avro does not support TIMESTAMP type with precision: "
               + precision
-              + ", it only supports precision less than 6.");
+              + ", it only support precisions <= 6.");
     }
     checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
   }
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 555853bda6b..70638a9c432 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
       throw new IllegalArgumentException(
           "Avro does not support TIMESTAMP type with precision: "
               + precision
-              + ", it only supports precision less than 6.");
+              + ", it only support precisions <= 6.");
     }
     checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
   }
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 417b1155bbd..b44273b57ca 100644
--- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
       throw new IllegalArgumentException(
           "Avro does not support TIMESTAMP type with precision: "
               + precision
-              + ", it only supports precision less than 6.");
+              + ", it only support precisions <= 6.");
     }
     checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
   }
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 417b1155bbd..b44273b57ca 100644
--- a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
       throw new IllegalArgumentException(
           "Avro does not support TIMESTAMP type with precision: "
               + precision
-              + ", it only supports precision less than 6.");
+              + ", it only support precisions <= 6.");
     }
     checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
   }