You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/16 10:14:21 UTC

[hudi] branch master updated: [HUDI-2191] Bump flink version to 1.13.1 (#3291)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f9513  [HUDI-2191] Bump flink version to 1.13.1 (#3291)
66f9513 is described below

commit 66f951322a3872073b86896fa5c10b51a0f6e4ab
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon Aug 16 18:14:05 2021 +0800

    [HUDI-2191] Bump flink version to 1.13.1 (#3291)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  21 ++--
 .../hudi/sink/bulk/sort/SortOperatorGen.java       |  14 +--
 .../apache/hudi/sink/event/WriteMetadataEvent.java |   2 +-
 .../sink/partitioner/BucketAssignFunction.java     |   5 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   6 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |  16 ++-
 .../org/apache/hudi/table/HoodieTableSink.java     |  35 +++---
 .../org/apache/hudi/table/HoodieTableSource.java   |  22 ++--
 .../org/apache/hudi/sink/StreamWriteITCase.java    |   2 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   2 +-
 .../apache/hudi/source/TestStreamReadOperator.java |   8 +-
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  80 ++++++------
 .../apache/hudi/table/TestHoodieTableFactory.java  |  42 ++++---
 .../apache/hudi/table/format/TestInputFormat.java  | 136 +++++++++++----------
 .../java/org/apache/hudi/utils/SchemaBuilder.java  |  71 +++++++++++
 .../org/apache/hudi/utils/TestConfigurations.java  |   7 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |  63 +++++-----
 .../hudi/utils/source/ContinuousFileSource.java    |   2 +-
 pom.xml                                            |   2 +-
 19 files changed, 304 insertions(+), 232 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 53122c3..1a89400 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -41,7 +41,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,6 +86,11 @@ public class StreamWriteOperatorCoordinator
   private final Context context;
 
   /**
+   * Gateways for sending events to sub tasks.
+   */
+  private transient SubtaskGateway[] gateways;
+
+  /**
    * Write client.
    */
   private transient HoodieFlinkWriteClient writeClient;
@@ -150,6 +154,7 @@ public class StreamWriteOperatorCoordinator
   public void start() throws Exception {
     // initialize event buffer
     reset();
+    this.gateways = new SubtaskGateway[this.parallelism];
     this.writeClient = StreamerUtil.createWriteClient(conf);
     this.tableState = TableState.create(conf);
     // init table, create it if not exists.
@@ -257,6 +262,11 @@ public class StreamWriteOperatorCoordinator
     // no operation
   }
 
+  @Override
+  public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
+    this.gateways[i] = subtaskGateway;
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
@@ -397,13 +407,8 @@ public class StreamWriteOperatorCoordinator
    */
   private void sendCommitAckEvents() {
     CompletableFuture<?>[] futures = IntStream.range(0, this.parallelism)
-        .mapToObj(taskID -> {
-          try {
-            return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
-          } catch (TaskNotRunningException e) {
-            throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e);
-          }
-        }).toArray(CompletableFuture<?>[]::new);
+        .mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance()))
+        .toArray(CompletableFuture<?>[]::new);
     try {
       CompletableFuture.allOf(futures).get();
     } catch (Exception e) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
index d178661..4d3fc08 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
@@ -22,11 +22,10 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
-import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Arrays;
-import java.util.List;
 import java.util.stream.IntStream;
 
 /**
@@ -34,13 +33,12 @@ import java.util.stream.IntStream;
  */
 public class SortOperatorGen {
   private final int[] sortIndices;
-  private final LogicalType[] sortTypes;
+  private final RowType rowType;
   private final TableConfig tableConfig = new TableConfig();
 
   public SortOperatorGen(RowType rowType, String[] sortFields) {
     this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray();
-    List<RowType.RowField> fields = rowType.getFields();
-    this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new);
+    this.rowType = rowType;
   }
 
   public OneInputStreamOperator<RowData, RowData> createSortOperator() {
@@ -51,8 +49,8 @@ public class SortOperatorGen {
   }
 
   private SortCodeGenerator createSortCodeGenerator() {
-    boolean[] padBooleans = new boolean[sortIndices.length];
-    IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true);
-    return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans);
+    SortSpec.SortSpecBuilder builder = SortSpec.builder();
+    IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
+    return new SortCodeGenerator(tableConfig, rowType, builder.build());
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
index 7aec6be..eb89b5a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
@@ -149,7 +149,7 @@ public class WriteMetadataEvent implements OperatorEvent {
     ValidationUtils.checkArgument(this.taskID == other.taskID);
     // the instant time could be monotonically increasing
     this.instantTime = other.instantTime;
-    this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true.
+    this.lastBatch |= other.lastBatch; // true if one of the event lastBatch is true
     List<WriteStatus> statusList = new ArrayList<>();
     statusList.addAll(this.writeStatuses);
     statusList.addAll(other.writeStatuses);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index e6c59b1..5c9d26f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -37,15 +37,16 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
 import org.apache.flink.util.Collector;
 
 import java.util.Objects;
@@ -147,7 +148,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
             TypeInformation.of(HoodieRecordGlobalLocation.class));
     double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
     if (ttl > 0) {
-      indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
+      indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) ttl)).build());
     }
     indexState = context.getKeyedStateStore().getState(indexStateDesc);
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 5b0ba4f..7fde34e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -39,8 +39,8 @@ import org.apache.hudi.util.StreamerUtil;
 import com.beust.jcommander.JCommander;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,9 +54,7 @@ import java.util.Properties;
 
 /**
  * An Utility which can incrementally consume data from Kafka and apply it to the target table.
- * currently, it only support COW table and insert, upsert operation.
- * <p>
- * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
+ * currently, it only supports COW table and insert, upsert operation.
  */
 public class HoodieFlinkStreamer {
   public static void main(String[] args) throws Exception {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 413f395..c753dde 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -27,17 +27,16 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Hoodie data source/sink factory.
@@ -62,7 +60,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
     helper.validate();
 
     Configuration conf = (Configuration) helper.getOptions();
-    TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
 
@@ -79,7 +77,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
     Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
-    TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
@@ -110,8 +108,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
    * @param conf The table options
    * @param schema The table schema
    */
-  private void sanityCheck(Configuration conf, TableSchema schema) {
-    List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
+  private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+    List<String> fields = schema.getColumnNames();
 
     // validate record key in pk absence.
     if (!schema.getPrimaryKey().isPresent()) {
@@ -144,7 +142,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
       Configuration conf,
       String tableName,
       CatalogTable table,
-      TableSchema schema) {
+      ResolvedSchema schema) {
     // table name
     conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
     // hoodie key about options
@@ -154,7 +152,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
     // hive options
     setupHiveOptions(conf);
     // infer avro schema from physical DDL schema
-    inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
+    inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
   }
 
   /**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 8a16c6d..ca6d33a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -34,9 +34,9 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
-import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
 import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -44,14 +44,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Map;
@@ -62,15 +62,15 @@ import java.util.Map;
 public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
   private final Configuration conf;
-  private final TableSchema schema;
+  private final ResolvedSchema schema;
   private boolean overwrite = false;
 
-  public HoodieTableSink(Configuration conf, TableSchema schema) {
+  public HoodieTableSink(Configuration conf, ResolvedSchema schema) {
     this.conf = conf;
     this.schema = schema;
   }
 
-  public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) {
+  public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) {
     this.conf = conf;
     this.schema = schema;
     this.overwrite = overwrite;
@@ -85,7 +85,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
           .getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
 
-      RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
+      RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
 
       // bulk_insert mode
       final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
@@ -108,7 +108,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
                     TypeInformation.of(RowData.class),
                     sortOperatorGen.createSortOperator())
                 .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
-            ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
+            ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
                 conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
           }
         }
@@ -203,21 +203,18 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
   }
 
   @Override
-  public void applyStaticPartition(Map<String, String> partition) {
+  public void applyStaticPartition(Map<String, String> partitions) {
     // #applyOverwrite should have been invoked.
-    if (this.overwrite) {
-      final String operationType;
-      if (partition.size() > 0) {
-        operationType = WriteOperationType.INSERT_OVERWRITE.value();
-      } else {
-        operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
-      }
-      this.conf.setString(FlinkOptions.OPERATION, operationType);
+    if (this.overwrite && partitions.size() > 0) {
+      this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
     }
   }
 
   @Override
-  public void applyOverwrite(boolean b) {
-    this.overwrite = b;
+  public void applyOverwrite(boolean overwrite) {
+    this.overwrite = overwrite;
+    // set up the operation as INSERT_OVERWRITE_TABLE first,
+    // if there are explicit partitions, #applyStaticPartition would overwrite the option.
+    this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index e1eec28..91afd49 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -109,7 +109,7 @@ public class HoodieTableSource implements
   private final transient HoodieTableMetaClient metaClient;
   private final long maxCompactionMemoryInBytes;
 
-  private final TableSchema schema;
+  private final ResolvedSchema schema;
   private final Path path;
   private final List<String> partitionKeys;
   private final String defaultPartName;
@@ -122,7 +122,7 @@ public class HoodieTableSource implements
   private List<Map<String, String>> requiredPartitions;
 
   public HoodieTableSource(
-      TableSchema schema,
+      ResolvedSchema schema,
       Path path,
       List<String> partitionKeys,
       String defaultPartName,
@@ -131,7 +131,7 @@ public class HoodieTableSource implements
   }
 
   public HoodieTableSource(
-      TableSchema schema,
+      ResolvedSchema schema,
       Path path,
       List<String> partitionKeys,
       String defaultPartName,
@@ -147,7 +147,7 @@ public class HoodieTableSource implements
     this.conf = conf;
     this.requiredPartitions = requiredPartitions;
     this.requiredPos = requiredPos == null
-        ? IntStream.range(0, schema.getFieldCount()).toArray()
+        ? IntStream.range(0, schema.getColumnCount()).toArray()
         : requiredPos;
     this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
     this.filters = filters == null ? Collections.emptyList() : filters;
@@ -250,8 +250,8 @@ public class HoodieTableSource implements
   }
 
   private DataType getProducedDataType() {
-    String[] schemaFieldNames = this.schema.getFieldNames();
-    DataType[] schemaTypes = this.schema.getFieldDataTypes();
+    String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
+    DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]);
 
     return DataTypes.ROW(Arrays.stream(this.requiredPos)
         .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
@@ -383,8 +383,8 @@ public class HoodieTableSource implements
             }
             FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
                 FilePathUtils.toFlinkPaths(paths),
-                this.schema.getFieldNames(),
-                this.schema.getFieldDataTypes(),
+                this.schema.getColumnNames().toArray(new String[0]),
+                this.schema.getColumnDataTypes().toArray(new DataType[0]),
                 this.requiredPos,
                 this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
                 this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
@@ -399,8 +399,8 @@ public class HoodieTableSource implements
       case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
         FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
             FilePathUtils.toFlinkPaths(paths),
-            this.schema.getFieldNames(),
-            this.schema.getFieldDataTypes(),
+            this.schema.getColumnNames().toArray(new String[0]),
+            this.schema.getColumnDataTypes().toArray(new DataType[0]),
             this.requiredPos,
             "default",
             this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index ade43c5..6aea153 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -57,8 +57,8 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index d1475bb..6f87ef7 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -110,7 +110,7 @@ public class StreamWriteFunctionWrapper<I> {
     this.gateway = new MockOperatorEventGateway();
     this.conf = conf;
     // one function
-    this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1, false);
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
     this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
     this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 4abc79a..8096d5e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -69,10 +69,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestStreamReadOperator {
   private static final Map<String, String> EXPECTED = new HashMap<>();
   static {
-    EXPECTED.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1");
-    EXPECTED.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2");
-    EXPECTED.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, id6,Emma,20,1970-01-01T00:00:00.006,par3");
-    EXPECTED.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, id8,Han,56,1970-01-01T00:00:00.008,par4");
+    EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]");
+    EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]");
+    EXPECTED.put("par3", "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], +I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3]");
+    EXPECTED.put("par4", "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], +I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]");
   }
 
   private Configuration conf;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index f8dc018..0764f55 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -68,8 +68,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * IT cases for Hoodie table source and sink.
- * <p>
- * Note: should add more SQL cases when batch write is supported.
  */
 public class HoodieDataSourceITCase extends AbstractTestBase {
   private TableEnvironment streamTableEnv;
@@ -289,7 +287,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
         + ")";
     List<Row> result = execSelectSql(streamTableEnv,
         "select name, sum(age) from t1 group by name", sinkDDL, 10);
-    final String expected = "[+I(Danny,24), +I(Stephen,34)]";
+    final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
     assertRowsEquals(result, expected, true);
   }
 
@@ -314,9 +312,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     List<Row> result2 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect());
     assertRowsEquals(result2, "["
-        + "id6,Emma,20,1970-01-01T00:00:06,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:07,par4, "
-        + "id8,Han,56,1970-01-01T00:00:08,par4]");
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
   @ParameterizedTest
@@ -350,14 +348,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     execInsertSql(streamTableEnv, insertInto);
 
     final String expected = "["
-        + "id1,Danny,23,par1,1970-01-01T00:00:01, "
-        + "id2,Stephen,33,par1,1970-01-01T00:00:02, "
-        + "id3,Julian,53,par2,1970-01-01T00:00:03, "
-        + "id4,Fabian,31,par2,1970-01-01T00:00:04, "
-        + "id5,Sophia,18,par3,1970-01-01T00:00:05, "
-        + "id6,Emma,20,par3,1970-01-01T00:00:06, "
-        + "id7,Bob,44,par4,1970-01-01T00:00:07, "
-        + "id8,Han,56,par4,1970-01-01T00:00:08]";
+        + "+I[id1, Danny, 23, par1, 1970-01-01T00:00:01], "
+        + "+I[id2, Stephen, 33, par1, 1970-01-01T00:00:02], "
+        + "+I[id3, Julian, 53, par2, 1970-01-01T00:00:03], "
+        + "+I[id4, Fabian, 31, par2, 1970-01-01T00:00:04], "
+        + "+I[id5, Sophia, 18, par3, 1970-01-01T00:00:05], "
+        + "+I[id6, Emma, 20, par3, 1970-01-01T00:00:06], "
+        + "+I[id7, Bob, 44, par4, 1970-01-01T00:00:07], "
+        + "+I[id8, Han, 56, par4, 1970-01-01T00:00:08]]";
 
     List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode);
 
@@ -401,8 +399,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     List<Row> result2 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     final String expected = "["
-        + "id1,Danny,24,1970-01-01T00:00:01,par1, "
-        + "id2,Stephen,34,1970-01-01T00:00:02,par2]";
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
     assertRowsEquals(result2, expected);
   }
 
@@ -431,7 +429,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
     List<Row> result = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]");
+    assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
   }
 
   @ParameterizedTest
@@ -467,7 +465,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
     List<Row> result = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
+    assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]");
   }
 
   @Test
@@ -490,7 +488,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
     List<Row> result = CollectionUtil.iterableToList(
         () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]");
+    assertRowsEquals(result, "[+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]");
   }
 
   @Test
@@ -514,10 +512,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     List<Row> result = CollectionUtil.iterableToList(
         () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
     final String expected = "["
-        + "id1,Stephen,34,1970-01-01T00:00:02,par1, "
-        + "id1,Fabian,32,1970-01-01T00:00:04,par2, "
-        + "id1,Jane,19,1970-01-01T00:00:06,par3, "
-        + "id1,Phoebe,52,1970-01-01T00:00:08,par4]";
+        + "+I[id1, Stephen, 34, 1970-01-01T00:00:02, par1], "
+        + "+I[id1, Fabian, 32, 1970-01-01T00:00:04, par2], "
+        + "+I[id1, Jane, 19, 1970-01-01T00:00:06, par3], "
+        + "+I[id1, Phoebe, 52, 1970-01-01T00:00:08, par4]]";
     assertRowsEquals(result, expected, 3);
   }
 
@@ -577,16 +575,16 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     execInsertSql(streamTableEnv, insertInto);
 
     final String expected = "["
-        + "101,1000,scooter,3.140000104904175, "
-        + "102,2000,car battery,8.100000381469727, "
-        + "103,3000,12-pack drill bits,0.800000011920929, "
-        + "104,4000,hammer,0.75, "
-        + "105,5000,hammer,0.875, "
-        + "106,10000,hammer,1.0, "
-        + "107,11000,rocks,5.099999904632568, "
-        + "108,8000,jacket,0.10000000149011612, "
-        + "109,9000,spare tire,22.200000762939453, "
-        + "110,14000,jacket,0.5]";
+        + "+I[101, 1000, scooter, 3.140000104904175], "
+        + "+I[102, 2000, car battery, 8.100000381469727], "
+        + "+I[103, 3000, 12-pack drill bits, 0.800000011920929], "
+        + "+I[104, 4000, hammer, 0.75], "
+        + "+I[105, 5000, hammer, 0.875], "
+        + "+I[106, 10000, hammer, 1.0], "
+        + "+I[107, 11000, rocks, 5.099999904632568], "
+        + "+I[108, 8000, jacket, 0.10000000149011612], "
+        + "+I[109, 9000, spare tire, 22.200000762939453], "
+        + "+I[110, 14000, jacket, 0.5]]";
 
     List<Row> result = execSelectSql(streamTableEnv, "select * from hoodie_sink", execMode);
 
@@ -621,9 +619,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     List<Row> result2 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect());
     assertRowsEquals(result2, "["
-        + "id6,Emma,20,1970-01-01T00:00:06,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:07,par4, "
-        + "id8,Han,56,1970-01-01T00:00:08,par4]");
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
   @Test
@@ -660,11 +658,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     List<Row> result = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     assertRowsEquals(result, "["
-        + "id1,Danny,23,1970-01-01T00:00:01,par1, "
-        + "id1,Stephen,33,1970-01-01T00:00:02,par2, "
-        + "id1,Julian,53,1970-01-01T00:00:03,par3, "
-        + "id1,Fabian,31,1970-01-01T00:00:04,par4, "
-        + "id1,Sophia,18,1970-01-01T00:00:05,par5]", 3);
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+        + "+I[id1, Stephen, 33, 1970-01-01T00:00:02, par2], "
+        + "+I[id1, Julian, 53, 1970-01-01T00:00:03, par3], "
+        + "+I[id1, Fabian, 31, 1970-01-01T00:00:04, par4], "
+        + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
   }
 
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 799739c..0439c4d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -24,16 +24,18 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.SchemaBuilder;
 import org.apache.hudi.utils.TestConfigurations;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -82,7 +84,7 @@ public class TestHoodieTableFactory {
   @Test
   void testRequiredOptionsForSource() {
     // miss pk and pre combine key will throw exception
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -92,7 +94,7 @@ public class TestHoodieTableFactory {
     assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
 
     // given the pk and miss the pre combine key will throw exception
-    TableSchema schema2 = TableSchema.builder()
+    ResolvedSchema schema2 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -103,7 +105,7 @@ public class TestHoodieTableFactory {
     assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
 
     // given pk and pre combine key will be ok
-    TableSchema schema3 = TableSchema.builder()
+    ResolvedSchema schema3 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -137,7 +139,7 @@ public class TestHoodieTableFactory {
     this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
     this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -152,7 +154,7 @@ public class TestHoodieTableFactory {
 
     // definition with complex primary keys and partition paths
     this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
-    TableSchema schema2 = TableSchema.builder()
+    ResolvedSchema schema2 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20).notNull())
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -177,7 +179,7 @@ public class TestHoodieTableFactory {
   @Test
   void testSetupHiveOptionsForSource() {
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -202,7 +204,7 @@ public class TestHoodieTableFactory {
   @Test
   void testSetupCleaningOptionsForSource() {
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -249,7 +251,7 @@ public class TestHoodieTableFactory {
     this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
     this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -264,7 +266,7 @@ public class TestHoodieTableFactory {
 
     // definition with complex primary keys and partition paths
     this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
-    TableSchema schema2 = TableSchema.builder()
+    ResolvedSchema schema2 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20).notNull())
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -289,7 +291,7 @@ public class TestHoodieTableFactory {
   @Test
   void testSetupHiveOptionsForSink() {
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -314,7 +316,7 @@ public class TestHoodieTableFactory {
   @Test
   void testSetupCleaningOptionsForSink() {
     // definition with simple primary key and partition path
-    TableSchema schema1 = TableSchema.builder()
+    ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
@@ -349,10 +351,10 @@ public class TestHoodieTableFactory {
    */
   private static class MockContext implements DynamicTableFactory.Context {
     private final Configuration conf;
-    private final TableSchema schema;
+    private final ResolvedSchema schema;
     private final List<String> partitions;
 
-    private MockContext(Configuration conf, TableSchema schema, List<String> partitions) {
+    private MockContext(Configuration conf, ResolvedSchema schema, List<String> partitions) {
       this.conf = conf;
       this.schema = schema;
       this.partitions = partitions;
@@ -362,11 +364,11 @@ public class TestHoodieTableFactory {
       return getInstance(conf, TestConfigurations.TABLE_SCHEMA, Collections.singletonList("partition"));
     }
 
-    static MockContext getInstance(Configuration conf, TableSchema schema, String partition) {
+    static MockContext getInstance(Configuration conf, ResolvedSchema schema, String partition) {
       return getInstance(conf, schema, Collections.singletonList(partition));
     }
 
-    static MockContext getInstance(Configuration conf, TableSchema schema, List<String> partitions) {
+    static MockContext getInstance(Configuration conf, ResolvedSchema schema, List<String> partitions) {
       return new MockContext(conf, schema, partitions);
     }
 
@@ -376,8 +378,10 @@ public class TestHoodieTableFactory {
     }
 
     @Override
-    public CatalogTable getCatalogTable() {
-      return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock source table");
+    public ResolvedCatalogTable getCatalogTable() {
+      CatalogTable catalogTable = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(schema).build(),
+          "mock source table", partitions, conf.toMap());
+      return new ResolvedCatalogTable(catalogTable, schema);
     }
 
     @Override
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 54848bd..cc0699f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -104,17 +104,18 @@ public class TestInputFormat {
     result = readData(inputFormat);
 
     actual = TestData.rowDataToString(result);
-    expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-        + "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
-        + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
-        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-        + "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
-        + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
-        + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
-        + "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
-        + "id8,Han,56,1970-01-01T00:00:00.008,par4, "
-        + "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
+    expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "+I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], "
+        + "+I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], "
+        + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], "
+        + "+I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4]]";
     assertThat(actual, is(expected));
   }
 
@@ -150,21 +151,22 @@ public class TestInputFormat {
     result = readData(inputFormat);
 
     actual = TestData.rowDataToString(result);
-    expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-        + "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
-        + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
-        + "id12,Monica,27,1970-01-01T00:00:00.009,par5, "
-        + "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, "
-        + "id14,Rachel,52,1970-01-01T00:00:00.011,par6, "
-        + "id15,Ross,29,1970-01-01T00:00:00.012,par6, "
-        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-        + "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
-        + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
-        + "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
-        + "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
-        + "id8,Han,56,1970-01-01T00:00:00.008,par4, "
-        + "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
+    expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "+I[id3, Julian, 54, 1970-01-01T00:00:00.003, par2], "
+        + "+I[id4, Fabian, 32, 1970-01-01T00:00:00.004, par2], "
+        + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], "
+        + "+I[id9, Jane, 19, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id10, Ella, 38, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id11, Phoebe, 52, 1970-01-01T00:00:00.008, par4], "
+        + "+I[id12, Monica, 27, 1970-01-01T00:00:00.009, par5], "
+        + "+I[id13, Phoebe, 31, 1970-01-01T00:00:00.010, par5], "
+        + "+I[id14, Rachel, 52, 1970-01-01T00:00:00.011, par6], "
+        + "+I[id15, Ross, 29, 1970-01-01T00:00:00.012, par6]]";
     assertThat(actual, is(expected));
   }
 
@@ -189,14 +191,14 @@ public class TestInputFormat {
     // when isEmitDelete is false.
     List<RowData> result1 = readData(inputFormat);
 
-    final String actual1 = TestData.rowDataToString(result1, true);
+    final String actual1 = TestData.rowDataToString(result1);
     final String expected1 = "["
-        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
-        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
-        + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
-        + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
-        + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
-        + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]";
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
     assertThat(actual1, is(expected1));
 
     // refresh the input format and set isEmitDelete to true.
@@ -206,17 +208,17 @@ public class TestInputFormat {
 
     List<RowData> result2 = readData(inputFormat);
 
-    final String actual2 = TestData.rowDataToString(result2, true);
+    final String actual2 = TestData.rowDataToString(result2);
     final String expected2 = "["
-        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
-        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
-        + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
-        + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
-        + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
-        + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
-        + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
-        + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), "
-        + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+        + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4], "
+        + "-D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
     assertThat(actual2, is(expected2));
   }
 
@@ -241,8 +243,8 @@ public class TestInputFormat {
     // when isEmitDelete is false.
     List<RowData> result1 = readData(inputFormat);
 
-    final String actual1 = TestData.rowDataToString(result1, true);
-    final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+    final String actual1 = TestData.rowDataToString(result1);
+    final String expected1 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
     assertThat(actual1, is(expected1));
 
     // refresh the input format and set isEmitDelete to true.
@@ -252,8 +254,8 @@ public class TestInputFormat {
 
     List<RowData> result2 = readData(inputFormat);
 
-    final String actual2 = TestData.rowDataToString(result2, true);
-    final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
+    final String actual2 = TestData.rowDataToString(result2);
+    final String expected2 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
     assertThat(actual2, is(expected2));
   }
 
@@ -272,13 +274,13 @@ public class TestInputFormat {
 
     List<RowData> result = readData(inputFormat);
 
-    final String actual = TestData.rowDataToString(result, true);
+    final String actual = TestData.rowDataToString(result);
     final String expected = "["
-        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
-        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
-        + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
-        + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
-        + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+        + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+        + "-D[id9, Jane, 19, 1970-01-01T00:00:00.006, par3]]";
     assertThat(actual, is(expected));
   }
 
@@ -294,10 +296,10 @@ public class TestInputFormat {
 
     List<RowData> result = readData(inputFormat);
 
-    final String actual = TestData.rowDataToString(result, true);
+    final String actual = TestData.rowDataToString(result);
     final String expected = "["
-        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
-        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]";
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1]]";
     assertThat(actual, is(expected));
   }
 
@@ -317,7 +319,9 @@ public class TestInputFormat {
     List<RowData> result = readData(inputFormat);
 
     String actual = TestData.rowDataToString(result);
-    String expected = "[id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1]";
+    String expected = "["
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]]";
     assertThat(actual, is(expected));
   }
 
@@ -335,16 +339,16 @@ public class TestInputFormat {
 
     List<RowData> result = readData(inputFormat);
 
-    final String actual = TestData.rowDataToString(result, true);
+    final String actual = TestData.rowDataToString(result);
     final String expected = "["
-        + "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
-        + "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
-        + "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
-        + "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
-        + "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
-        + "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
-        + "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), "
-        + "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]";
+        + "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "
+        + "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "
+        + "+U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], "
+        + "-U[id1, Danny, 20, 1970-01-01T00:00:00.002, par1], "
+        + "+U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], "
+        + "-U[id1, Danny, 21, 1970-01-01T00:00:00.003, par1], "
+        + "+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1], "
+        + "-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
     assertThat(actual, is(expected));
   }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java b/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java
new file mode 100644
index 0000000..39dd6d6
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/SchemaBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Builder for {@link ResolvedSchema}.
+ */
+public class SchemaBuilder {
+  private List<Column> columns;
+  private List<WatermarkSpec> watermarkSpecs;
+  private UniqueConstraint constraint;
+
+  public static SchemaBuilder instance() {
+    return new SchemaBuilder();
+  }
+
+  private SchemaBuilder() {
+    this.columns = new ArrayList<>();
+    this.watermarkSpecs = new ArrayList<>();
+  }
+
+  public SchemaBuilder field(String name, DataType type) {
+    this.columns.add(Column.physical(name, type));
+    return this;
+  }
+
+  public SchemaBuilder fields(List<String> names, List<DataType> types) {
+    List<Column> columns = IntStream.range(0, names.size())
+        .mapToObj(idx -> Column.physical(names.get(idx), types.get(idx)))
+        .collect(Collectors.toList());
+    this.columns.addAll(columns);
+    return this;
+  }
+
+  public SchemaBuilder primaryKey(String... columns) {
+    this.constraint = UniqueConstraint.primaryKey("pk", Arrays.asList(columns));
+    return this;
+  }
+
+  public ResolvedSchema build() {
+    return new ResolvedSchema(columns, watermarkSpecs, constraint);
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 2be2aca..a7e38c0 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -26,6 +26,7 @@ import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -50,10 +51,8 @@ public class TestConfigurations {
 
   public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType();
 
-  public static final TableSchema TABLE_SCHEMA = TableSchema.builder()
-      .fields(
-          ROW_TYPE.getFieldNames().toArray(new String[0]),
-          ROW_DATA_TYPE.getChildren().toArray(new DataType[0]))
+  public static final ResolvedSchema TABLE_SCHEMA = SchemaBuilder.instance()
+      .fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren())
       .build();
 
   public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b51631c..b6b738a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -73,7 +73,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Data set for testing, also some utilities to check the results. */
+/**
+ * Data set for testing, also some utilities to check the results.
+ */
 public class TestData {
   public static List<RowData> DATA_SET_INSERT = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
@@ -128,6 +130,7 @@ public class TestData {
   );
 
   public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
+
   static {
     IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
         insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
@@ -135,6 +138,7 @@ public class TestData {
   }
 
   public static List<RowData> DATA_SET_INSERT_SAME_KEY = new ArrayList<>();
+
   static {
     IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add(
         insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
@@ -280,39 +284,34 @@ public class TestData {
           TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
   );
 
-  /**
-   * Returns string format of a list of RowData.
-   */
-  public static String rowDataToString(List<RowData> rows) {
-    return rowDataToString(rows, false);
+  private static Integer toIdSafely(Object id) {
+    if (id == null) {
+      return -1;
+    }
+    final String idStr = id.toString();
+    if (idStr.startsWith("id")) {
+      return Integer.parseInt(idStr.substring(2));
+    }
+    return -1;
   }
 
   /**
    * Returns string format of a list of RowData.
-   *
-   * @param withChangeFlag whether to print the change flag
    */
-  public static String rowDataToString(List<RowData> rows, boolean withChangeFlag) {
+  public static String rowDataToString(List<RowData> rows) {
     DataStructureConverter<Object, Object> converter =
         DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
     return rows.stream()
-        .sorted(Comparator.comparing(o -> toStringSafely(o.getString(0))))
-        .map(row -> {
-          final String rowStr = converter.toExternal(row).toString();
-          if (withChangeFlag) {
-            return row.getRowKind().shortString() + "(" + rowStr + ")";
-          } else {
-            return rowStr;
-          }
-        })
+        .sorted(Comparator.comparing(o -> toIdSafely(o.getString(0))))
+        .map(row -> converter.toExternal(row).toString())
         .collect(Collectors.toList()).toString();
   }
 
   /**
    * Write a list of row data with Hoodie format base on the given configuration.
    *
-   * @param dataBuffer  The data buffer to write
-   * @param conf        The flink configuration
+   * @param dataBuffer The data buffer to write
+   * @param conf       The flink configuration
    * @throws Exception if error occurs
    */
   public static void writeData(
@@ -379,8 +378,8 @@ public class TestData {
    * Sort the {@code rows} using field at index {@code orderingPos} and asserts
    * it equals with the expected string {@code expected}.
    *
-   * @param rows     Actual result rows
-   * @param expected Expected string of the sorted rows
+   * @param rows        Actual result rows
+   * @param expected    Expected string of the sorted rows
    * @param orderingPos Field position for ordering
    */
   public static void assertRowsEquals(List<Row> rows, String expected, int orderingPos) {
@@ -399,9 +398,9 @@ public class TestData {
    */
   public static void assertRowsEquals(List<Row> rows, List<RowData> expected) {
     String rowsString = rows.stream()
-        .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
+        .sorted(Comparator.comparing(o -> toIdSafely(o.getField(0))))
         .collect(Collectors.toList()).toString();
-    assertThat(rowsString, is(rowDataToString(expected)));
+    assertThat(rowDataToString(expected), is(rowsString));
   }
 
   /**
@@ -425,7 +424,7 @@ public class TestData {
    */
   public static void assertRowDataEquals(List<RowData> rows, List<RowData> expected) {
     String rowsString = rowDataToString(rows);
-    assertThat(rowsString, is(rowDataToString(expected)));
+    assertThat(rowDataToString(expected), is(rowsString));
   }
 
   /**
@@ -526,8 +525,8 @@ public class TestData {
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
-   * @param basePath   The file base to check, should be a directory
-   * @param expected   The expected results mapping, the key should be the partition path
+   * @param basePath The file base to check, should be a directory
+   * @param expected The expected results mapping, the key should be the partition path
    */
   public static void checkWrittenFullData(
       File basePath,
@@ -571,12 +570,12 @@ public class TestData {
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
-   * @param fs         The file system
+   * @param fs            The file system
    * @param latestInstant The latest committed instant of current table
-   * @param baseFile   The file base to check, should be a directory
-   * @param expected   The expected results mapping, the key should be the partition path
-   * @param partitions The expected partition number
-   * @param schema     The read schema
+   * @param baseFile      The file base to check, should be a directory
+   * @param expected      The expected results mapping, the key should be the partition path
+   * @param partitions    The expected partition number
+   * @param schema        The read schema
    */
   public static void checkWrittenDataMOR(
       FileSystem fs,
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
index 19b23f5..a440610 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -20,8 +20,8 @@ package org.apache.hudi.utils.source;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
diff --git a/pom.xml b/pom.xml
index d843510..0fd756e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@
     <http.version>4.4.1</http.version>
     <spark.version>${spark2.version}</spark.version>
     <sparkbundle.version>${spark2bundle.version}</sparkbundle.version>
-    <flink.version>1.12.2</flink.version>
+    <flink.version>1.13.1</flink.version>
     <spark2.version>2.4.4</spark2.version>
     <spark3.version>3.0.0</spark3.version>
     <spark2bundle.version></spark2bundle.version>