You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/19 09:15:40 UTC

[hudi] branch master updated: [HUDI-2316] Support Flink batch upsert (#3494)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fed44a  [HUDI-2316] Support Flink batch upsert (#3494)
1fed44a is described below

commit 1fed44af84b4726d40c57f1dad012c8e4a510f91
Author: swuferhong <33...@qq.com>
AuthorDate: Thu Aug 19 17:15:26 2021 +0800

    [HUDI-2316] Support Flink batch upsert (#3494)
---
 .../hudi/sink/bootstrap/BootstrapFunction.java     |  10 +-
 .../bootstrap/batch/BatchBootstrapFunction.java    |  65 ++++++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 165 +++++++++++++++++++++
 .../org/apache/hudi/table/HoodieTableSink.java     | 107 ++-----------
 .../java/org/apache/hudi/util/StreamerUtil.java    |   4 +
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  99 +++++++++----
 .../org/apache/hudi/utils/TestConfigurations.java  |  18 ++-
 .../test/java/org/apache/hudi/utils/TestSQL.java   |  10 ++
 8 files changed, 344 insertions(+), 134 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index 4923362..fc3c5f4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -72,12 +72,12 @@ public class BootstrapFunction<I, O extends HoodieRecord>
 
   private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
 
-  private HoodieTable<?, ?, ?, ?> hoodieTable;
+  protected HoodieTable<?, ?, ?, ?> hoodieTable;
 
-  private final Configuration conf;
+  protected final Configuration conf;
 
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
-  private transient HoodieWriteConfig writeConfig;
+  protected transient org.apache.hadoop.conf.Configuration hadoopConf;
+  protected transient HoodieWriteConfig writeConfig;
 
   private GlobalAggregateManager aggregateManager;
 
@@ -153,7 +153,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
    * @param partitionPath The partition path
    */
   @SuppressWarnings("unchecked")
-  private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
+  protected void loadRecords(String partitionPath, Collector<O> out) throws Exception {
     long start = System.currentTimeMillis();
 
     BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
new file mode 100644
index 0000000..8b136b4
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.batch;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The function to load specify partition index from existing hoodieTable.
+ */
+public class BatchBootstrapFunction<I, O extends HoodieRecord>
+    extends BootstrapFunction<I, O> {
+
+  private Set<String> partitionPathSet;
+  private boolean haveSuccessfulCommits;
+
+  public BatchBootstrapFunction(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.partitionPathSet = new HashSet<>();
+    this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
+  }
+
+  @Override
+  public void processElement(I value, Context context, Collector<O> out) throws Exception {
+    final HoodieRecord record = (HoodieRecord<?>) value;
+    final String partitionPath = record.getKey().getPartitionPath();
+
+    if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {
+      loadRecords(partitionPath, out);
+      partitionPathSet.add(partitionPath);
+    }
+
+    // send the trigger record
+    out.collect((O) value);
+  }
+
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
new file mode 100644
index 0000000..4808eb7
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction;
+import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
+import org.apache.hudi.table.format.FilePathUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Utilities to generate all kinds of sub-pipelines.
+ */
+public class Pipelines {
+
+  public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+    BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+
+    final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
+    if (partitionFields.length > 0) {
+      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
+
+        // shuffle by partition keys
+        dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
+      }
+      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
+        SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
+        // sort by partition keys
+        dataStream = dataStream
+            .transform("partition_key_sorter",
+                TypeInformation.of(RowData.class),
+                sortOperatorGen.createSortOperator())
+            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+        ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+            conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+      }
+    }
+    return dataStream
+        .transform("hoodie_bulk_insert_write",
+            TypeInformation.of(Object.class),
+            operatorFactory)
+        // follow the parallelism of upstream operators to avoid shuffle
+        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+        .addSink(new CleanFunction<>(conf))
+        .setParallelism(1)
+        .name("clean_commits");
+  }
+
+  public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
+    DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      dataStream1 = dataStream1.rebalance()
+          .transform(
+              "index_bootstrap",
+              TypeInformation.of(HoodieRecord.class),
+              new ProcessOperator<>(new BootstrapFunction<>(conf)))
+          .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+          .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+    }
+
+    return dataStream1;
+  }
+
+  public static DataStream<HoodieRecord> batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
+    // shuffle and sort by partition keys
+    final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
+    if (partitionFields.length > 0) {
+      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+      // shuffle by partition keys
+      dataStream = dataStream
+          .keyBy(rowDataKeyGen::getPartitionPath);
+    }
+
+    return rowDataToHoodieRecord(conf, rowType, dataStream)
+        .transform(
+            "batch_index_bootstrap",
+            TypeInformation.of(HoodieRecord.class),
+            new ProcessOperator<>(new BatchBootstrapFunction<>(conf)))
+        .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+        .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+  }
+
+  public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+    return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
+  }
+
+  public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
+    return dataStream
+        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
+        .keyBy(HoodieRecord::getRecordKey)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+        .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
+        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
+        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+  }
+
+  public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
+    return dataStream.transform("compact_plan_generate",
+        TypeInformation.of(CompactionPlanEvent.class),
+        new CompactionPlanOperator(conf))
+        .setParallelism(1) // plan generate must be singleton
+        .rebalance()
+        .transform("compact_task",
+            TypeInformation.of(CompactionCommitEvent.class),
+            new ProcessOperator<>(new CompactFunction(conf)))
+        .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+        .addSink(new CompactionCommitSink(conf))
+        .name("compact_commit")
+        .setParallelism(1); // compaction commit should be singleton
+  }
+
+  public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
+    return dataStream.addSink(new CleanFunction<>(conf))
+        .setParallelism(1)
+        .name("clean_commits");
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index ca6d33a..d49fdf1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -21,37 +21,19 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
-import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
-import org.apache.hudi.sink.bulk.RowDataKeyGen;
-import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
-import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
-import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Map;
@@ -90,90 +72,23 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
       // bulk_insert mode
       final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
       if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
-        BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
-
-        final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
-        if (partitionFields.length > 0) {
-          RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
-          if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
-
-            // shuffle by partition keys
-            dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
-          }
-          if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
-            SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
-            // sort by partition keys
-            dataStream = dataStream
-                .transform("partition_key_sorter",
-                    TypeInformation.of(RowData.class),
-                    sortOperatorGen.createSortOperator())
-                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
-            ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
-                conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
-          }
-        }
-        return dataStream
-            .transform("hoodie_bulk_insert_write",
-                TypeInformation.of(Object.class),
-                operatorFactory)
-            // follow the parallelism of upstream operators to avoid shuffle
-            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
-            .addSink(new CleanFunction<>(conf))
-            .setParallelism(1)
-            .name("clean_commits");
+        return Pipelines.bulkInsert(conf, rowType, dataStream);
       }
 
-      // stream write
+      // default parallelism
       int parallelism = dataStream.getExecutionConfig().getParallelism();
-      StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
-
-      DataStream<HoodieRecord> dataStream1 = dataStream
-          .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
-      // bootstrap index
-      // TODO: This is a very time-consuming operation, will optimization
-      if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-        dataStream1 = dataStream1.rebalance()
-            .transform(
-                "index_bootstrap",
-                TypeInformation.of(HoodieRecord.class),
-                new ProcessOperator<>(new BootstrapFunction<>(conf)))
-            .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
-            .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
-      }
+      final DataStream<HoodieRecord> dataStream1 = context.isBounded()
+          ? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream)
+          : Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
+
+      // write pipeline
+      DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
 
-      DataStream<Object> pipeline = dataStream1
-          // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-          .keyBy(HoodieRecord::getRecordKey)
-          .transform(
-              "bucket_assigner",
-              TypeInformation.of(HoodieRecord.class),
-              new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-          .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
-          .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
-          // shuffle by fileId(bucket id)
-          .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
-          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
       // compaction
       if (StreamerUtil.needsAsyncCompaction(conf)) {
-        return pipeline.transform("compact_plan_generate",
-            TypeInformation.of(CompactionPlanEvent.class),
-            new CompactionPlanOperator(conf))
-            .setParallelism(1) // plan generate must be singleton
-            .rebalance()
-            .transform("compact_task",
-                TypeInformation.of(CompactionCommitEvent.class),
-                new ProcessOperator<>(new CompactFunction(conf)))
-            .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
-            .addSink(new CompactionCommitSink(conf))
-            .name("compact_commit")
-            .setParallelism(1); // compaction commit should be singleton
+        return Pipelines.compact(conf, pipeline);
       } else {
-        return pipeline.addSink(new CleanFunction<>(conf))
-            .setParallelism(1)
-            .name("clean_commits");
+        return Pipelines.clean(conf, pipeline);
       }
     };
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 0fac2cc..f89d089 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -379,4 +379,8 @@ public class StreamerUtil {
     WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
     return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
   }
+
+  public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
+    return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 0764f55..99effba 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -292,7 +292,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   }
 
   @ParameterizedTest
-  @MethodSource("configParams")
+  @MethodSource("executionModeAndPartitioningParams")
   void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
     Map<String, String> options = new HashMap<>();
@@ -318,6 +318,56 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   }
 
   @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_NAME.key(), tableType.name());
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+    // batchMode update
+    execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
+  @ParameterizedTest
+  @MethodSource("tableTypeAndPartitioningParams")
+  void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) {
+    TableEnvironment tableEnv = batchTableEnv;
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_NAME.key(), tableType.name());
+    if (hiveStylePartitioning) {
+      options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
+    }
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+    // batchMode update
+    execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
+  @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
     boolean streaming = execMode == ExecMode.STREAM;
@@ -436,18 +486,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   @EnumSource(value = ExecMode.class)
   void testWriteNonPartitionedTable(ExecMode execMode) {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
-    String hoodieTableDDL = "create table t1(\n"
-        + "  uuid varchar(20),\n"
-        + "  name varchar(10),\n"
-        + "  age int,\n"
-        + "  ts timestamp(3),\n"
-        + "  `partition` varchar(20),\n"
-        + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
-        + ")\n"
-        + "with (\n"
-        + "  'connector' = 'hudi',\n"
-        + "  'path' = '" + tempFile.getAbsolutePath() + "'\n"
-        + ")";
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
     tableEnv.executeSql(hoodieTableDDL);
 
     final String insertInto1 = "insert into t1 values\n"
@@ -627,19 +668,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   @Test
   void testBulkInsertNonPartitionedTable() {
     TableEnvironment tableEnv = batchTableEnv;
-    String hoodieTableDDL = "create table t1(\n"
-        + "  uuid varchar(20),\n"
-        + "  name varchar(10),\n"
-        + "  age int,\n"
-        + "  ts timestamp(3),\n"
-        + "  `partition` varchar(20),\n"
-        + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
-        + ")\n"
-        + "with (\n"
-        + "  'connector' = 'hudi',\n"
-        + "  'path' = '" + tempFile.getAbsolutePath() + "',\n"
-        + "  'write.operation' = 'bulk_insert'\n"
-        + ")";
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.OPERATION.key(), "bulk_insert");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
     tableEnv.executeSql(hoodieTableDDL);
 
     final String insertInto1 = "insert into t1 values\n"
@@ -675,7 +707,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   /**
    * Return test params => (execution mode, hive style partitioning).
    */
-  private static Stream<Arguments> configParams() {
+  private static Stream<Arguments> executionModeAndPartitioningParams() {
     Object[][] data =
         new Object[][] {
             {ExecMode.BATCH, false},
@@ -685,6 +717,19 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (HoodieTableType, hive style partitioning).
+   */
+  private static Stream<Arguments> tableTypeAndPartitioningParams() {
+    Object[][] data =
+        new Object[][] {
+            {HoodieTableType.COPY_ON_WRITE, false},
+            {HoodieTableType.COPY_ON_WRITE, true},
+            {HoodieTableType.MERGE_ON_READ, false},
+            {HoodieTableType.MERGE_ON_READ, true}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private void execInsertSql(TableEnvironment tEnv, String insert) {
     TableResult tableResult = tEnv.executeSql(insert);
     // wait to finish
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index a7e38c0..f9db04c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -56,18 +56,24 @@ public class TestConfigurations {
       .build();
 
   public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
-    String createTable = "create table " + tableName + "(\n"
+    return getCreateHoodieTableDDL(tableName, options, true);
+  }
+
+  public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options, boolean havePartition) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("create table " + tableName + "(\n"
         + "  uuid varchar(20),\n"
         + "  name varchar(10),\n"
         + "  age int,\n"
         + "  ts timestamp(3),\n"
         + "  `partition` varchar(20),\n"
         + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
-        + ")\n"
-        + "PARTITIONED BY (`partition`)\n"
-        + "with (\n"
-        + "  'connector' = 'hudi'";
-    StringBuilder builder = new StringBuilder(createTable);
+        + ")\n");
+    if (havePartition) {
+      builder.append("PARTITIONED BY (`partition`)\n");
+    }
+    builder.append("with (\n"
+        + "  'connector' = 'hudi'");
     options.forEach((k, v) -> builder.append(",\n")
         .append("  '").append(k).append("' = '").append(v).append("'"));
     builder.append("\n)");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 8fb15ed..6b0b71c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -33,4 +33,14 @@ public class TestSQL {
       + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
       + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
       + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+  public static final String UPDATE_INSERT_T1 = "insert into t1 values\n"
+      + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+      + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+      + "('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+      + "('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+      + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+      + "('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+      + "('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+      + "('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')";
 }