You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/16 08:05:16 UTC

[hudi] branch master updated: [HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new da80b8c8d52 [HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)
da80b8c8d52 is described below

commit da80b8c8d529bb8ff549b89d87e841eb3c2462cc
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 16 16:05:04 2023 +0800

    [HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |   8 +-
 .../storage/HoodieConsistentBucketLayout.java      |   1 +
 .../table/storage/HoodieSimpleBucketLayout.java    |   1 +
 .../hudi/client/HoodieFlinkTableServiceClient.java |   8 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  44 ++---
 .../apache/hudi/index/FlinkHoodieIndexFactory.java |   4 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +
 .../apache/hudi/configuration/OptionsResolver.java |  37 ++++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  12 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |   9 +-
 .../sink/common/AbstractStreamWriteFunction.java   |   2 +-
 .../org/apache/hudi/sink/meta/CkpMetadata.java     |  21 +-
 .../org/apache/hudi/util/FlinkWriteClients.java    |  22 ++-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  15 ++
 .../apache/hudi/util/ViewStorageProperties.java    |  15 +-
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |   2 -
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  80 +++++++-
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  10 +
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |  11 ++
 .../org/apache/hudi/sink/meta/TestCkpMetadata.java |  21 +-
 .../utils/BucketStreamWriteFunctionWrapper.java    | 212 +++++++++++++++++++++
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  68 +++++--
 .../hudi/utils/TestViewStorageProperties.java      |  10 +-
 23 files changed, 521 insertions(+), 100 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 207c30509d8..366e6aa4c04 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -49,6 +49,7 @@ import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
 import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
 import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
 import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
+import static org.apache.hudi.index.HoodieIndex.IndexType.FLINK_STATE;
 import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
 import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
 import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
@@ -73,7 +74,7 @@ public class HoodieIndexConfig extends HoodieConfig {
       // Builder#getDefaultIndexType has already set it according to engine type
       .noDefaultValue()
       .withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
-          SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
+          SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name(), FLINK_STATE.name())
       .withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, "
           + "and INMEMORY on Flink and Java engines. "
           + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
@@ -651,6 +652,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRecordKeyField(String keyField) {
+      hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME, keyField);
+      return this;
+    }
+
     public HoodieIndexConfig build() {
       hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType));
       hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
index acea61177a5..b6135b4c5d1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
@@ -37,6 +37,7 @@ public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
       WriteOperationType.UPSERT,
       WriteOperationType.UPSERT_PREPPED,
       WriteOperationType.INSERT_OVERWRITE,
+      WriteOperationType.INSERT_OVERWRITE_TABLE,
       WriteOperationType.DELETE,
       WriteOperationType.COMPACT,
       WriteOperationType.DELETE_PARTITION,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
index be048a23b05..53550bc6225 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
@@ -37,6 +37,7 @@ public class HoodieSimpleBucketLayout extends HoodieStorageLayout {
       WriteOperationType.UPSERT,
       WriteOperationType.UPSERT_PREPPED,
       WriteOperationType.INSERT_OVERWRITE,
+      WriteOperationType.INSERT_OVERWRITE_TABLE,
       WriteOperationType.DELETE,
       WriteOperationType.COMPACT,
       WriteOperationType.DELETE_PARTITION
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 4b3eaaa1d42..faf27d2ce82 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -182,6 +182,14 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie
     }
   }
 
+  @Override
+  protected void preCommit(HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+    // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);
+    resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
+  }
+
   /**
    * Initialize the table metadata writer, for e.g, bootstrap the metadata table
    * from the filesystem if it does not exist.
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7d0728b95eb..3d4f6a3873b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -44,7 +46,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.util.WriteStatMerger;
@@ -274,6 +275,19 @@ public class HoodieFlinkWriteClient<T> extends
     // remove the async cleaning
   }
 
+  /**
+   * Refresh the last transaction metadata,
+   * should be called before the Driver starts a new transaction.
+   */
+  public void preTxn(HoodieTableMetaClient metaClient) {
+    if (txnManager.isOptimisticConcurrencyControlEnabled()) {
+      // refresh the meta client which is reused
+      metaClient.reloadActiveTimeline();
+      this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
+      this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
+    }
+  }
+
   @Override
   protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
     tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
@@ -322,30 +336,12 @@ public class HoodieFlinkWriteClient<T> extends
     return result.getWriteStatuses();
   }
 
-  /**
-   * Post commit is rewrite to be invoked after a successful commit.
-   *
-   * <p>The Flink write client is designed to write data set as buckets
-   * but cleaning action should trigger after all the write actions within a
-   * checkpoint finish.
-   *
-   * @param table         Table to commit on
-   * @param metadata      Commit Metadata corresponding to committed instant
-   * @param instantTime   Instant Time
-   * @param extraMetadata Additional Metadata passed by user
-   */
   @Override
-  protected void postCommit(HoodieTable table,
-                            HoodieCommitMetadata metadata,
-                            String instantTime,
-                            Option<Map<String, String>> extraMetadata) {
-    try {
-      // Delete the marker directory for the instant.
-      WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
-          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    } finally {
-      this.heartbeatClient.stop(instantTime);
-    }
+  protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+    // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);
+    resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
   }
 
   @Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index b10014b9183..13302293d46 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -46,8 +46,10 @@ public final class FlinkHoodieIndexFactory {
       return (HoodieIndex) instance;
     }
 
-    // TODO more indexes to be added
     switch (config.getIndexType()) {
+      case FLINK_STATE:
+        // Flink state index stores the index mappings with a state-backend,
+        // instantiates an in-memory HoodieIndex component as a placeholder.
       case INMEMORY:
         return new FlinkInMemoryStateIndex(context, config);
       case BLOOM:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9cdeb963d53..f7203699683 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -197,6 +197,7 @@ public class FlinkOptions extends HoodieConfig {
       .key("index.type")
       .stringType()
       .defaultValue(HoodieIndex.IndexType.FLINK_STATE.name())
+      .withFallbackKeys(HoodieIndexConfig.INDEX_TYPE.key())
       .withDescription("Index type of Flink write job, default is using state backed index.");
 
   public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
@@ -562,6 +563,13 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(128)
       .withDescription("Sort memory in MB, default 128MB");
 
+  // this is only for internal use
+  public static final ConfigOption<String> WRITE_CLIENT_ID = ConfigOptions
+      .key("write.client.id")
+      .stringType()
+      .defaultValue("")
+      .withDescription("Unique identifier used to distinguish different writer pipelines for concurrent mode");
+
   // ------------------------------------------------------------------------
   //  Compaction Options
   // ------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 42b94b58351..51512244a71 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -18,11 +18,16 @@
 
 package org.apache.hudi.configuration;
 
+import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -227,6 +232,38 @@ public class OptionsResolver {
     return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent();
   }
 
+  /**
+   * Returns whether the writer txn should be guarded by lock.
+   */
+  public static boolean needsGuardByLock(Configuration conf) {
+    return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
+        || conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+            .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+  }
+
+  /**
+   * Returns the index type.
+   */
+  public static HoodieIndex.IndexType getIndexType(Configuration conf) {
+    return HoodieIndex.IndexType.valueOf(conf.getString(FlinkOptions.INDEX_TYPE));
+  }
+
+  /**
+   * Returns the index key field.
+   */
+  public static String getIndexKeyField(Configuration conf) {
+    return conf.getString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
+  }
+
+  /**
+   * Returns the conflict resolution strategy.
+   */
+  public static ConflictResolutionStrategy getConflictResolutionStrategy(Configuration conf) {
+    return isBucketIndexType(conf)
+        ? new BucketIndexConcurrentFileWritesConflictResolutionStrategy()
+        : new SimpleConcurrentFileWritesConflictResolutionStrategy();
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 738a0089ef9..9f31d6f59a7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -41,7 +41,6 @@ import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
-import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -180,7 +179,7 @@ public class StreamWriteOperatorCoordinator
     this.gateways = new SubtaskGateway[this.parallelism];
     // init table, create if not exists.
     this.metaClient = initTableIfNotExists(this.conf);
-    this.ckpMetadata = initCkpMetadata(this.metaClient);
+    this.ckpMetadata = initCkpMetadata(this.metaClient, this.conf);
     // the write client must create after the table creation
     this.writeClient = FlinkWriteClients.createWriteClient(conf);
     initMetadataTable(this.writeClient);
@@ -342,8 +341,8 @@ public class StreamWriteOperatorCoordinator
     writeClient.initMetadataTable();
   }
 
-  private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
-    CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
+  private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient, Configuration conf) throws IOException {
+    CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient, conf.getString(FlinkOptions.WRITE_CLIENT_ID));
     ckpMetadata.bootstrap();
     return ckpMetadata;
   }
@@ -372,6 +371,8 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
+    // refresh the last txn metadata
+    this.writeClient.preTxn(this.metaClient);
     // put the assignment in front of metadata generation,
     // because the instant request from write task is asynchronous.
     this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
@@ -391,8 +392,7 @@ public class StreamWriteOperatorCoordinator
    * until it finds a new inflight instant on the timeline.
    */
   private void initInstant(String instant) {
-    HoodieTimeline completedTimeline =
-        StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
+    HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
     executor.execute(() -> {
       if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) {
         // the last instant committed successfully
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 3eaa47e3b62..42ade8bec66 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -127,7 +127,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
     this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
     this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
-    this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
+    this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient(), this.conf.getString(FlinkOptions.WRITE_CLIENT_ID));
     this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
 
     preLoadIndexRecords();
@@ -229,17 +229,14 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
             .filter(logFile -> isValidFile(logFile.getFileStatus()))
             .map(logFile -> logFile.getPath().toString())
             .collect(toList());
-        HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
-            writeConfig, hadoopConf);
 
-        try {
+        try (HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
+            writeConfig, hadoopConf)) {
           for (String recordKey : scanner.getRecords().keySet()) {
             output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
           }
         } catch (Exception e) {
           throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
-        } finally {
-          scanner.close();
         }
       }
     }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index fa4c3db86ea..23b580b86bc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -148,7 +148,7 @@ public abstract class AbstractStreamWriteFunction<I>
             TypeInformation.of(WriteMetadataEvent.class)
         ));
 
-    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
+    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient, this.config.getString(FlinkOptions.WRITE_CLIENT_ID));
     this.currentInstant = lastPendingInstant();
     if (context.isRestored()) {
       restoreWriteMetadata();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index addabcdc4c8..39ae1a1ef32 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.meta;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -75,12 +76,13 @@ public class CkpMetadata implements Serializable {
   private List<String> instantCache;
 
   private CkpMetadata(Configuration config) {
-    this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
+    this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)),
+        config.getString(FlinkOptions.PATH), config.getString(FlinkOptions.WRITE_CLIENT_ID));
   }
 
-  private CkpMetadata(FileSystem fs, String basePath) {
+  private CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
     this.fs = fs;
-    this.path = new Path(ckpMetaPath(basePath));
+    this.path = new Path(ckpMetaPath(basePath, uniqueId));
   }
 
   public void close() {
@@ -208,12 +210,17 @@ public class CkpMetadata implements Serializable {
     return new CkpMetadata(config);
   }
 
-  public static CkpMetadata getInstance(FileSystem fs, String basePath) {
-    return new CkpMetadata(fs, basePath);
+  public static CkpMetadata getInstance(HoodieTableMetaClient metaClient, String uniqueId) {
+    return new CkpMetadata(metaClient.getFs(), metaClient.getBasePath(), uniqueId);
   }
 
-  protected static String ckpMetaPath(String basePath) {
-    return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
+  public static CkpMetadata getInstance(FileSystem fs, String basePath, String uniqueId) {
+    return new CkpMetadata(fs, basePath, uniqueId);
+  }
+
+  protected static String ckpMetaPath(String basePath, String uniqueId) {
+    String metaPath = basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
+    return StringUtils.isNullOrEmpty(uniqueId) ? metaPath : metaPath + "_" + uniqueId;
   }
 
   private Path fullPath(String fileName) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 9f61580da0b..fa324952ed8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -216,6 +216,7 @@ public class FlinkWriteClients {
                 .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
                 .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())
+            .withIndexConfig(StreamerUtil.getIndexConfig(conf))
             .withPayloadConfig(getPayloadConfig(conf))
             .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
@@ -226,15 +227,18 @@ public class FlinkWriteClients {
 
     if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
       builder.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
-      if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
-        builder.withLockConfig(HoodieLockConfig.newBuilder()
-            .withLockProvider(FileSystemBasedLockProvider.class)
-            .withLockWaitTimeInMillis(2000L) // 2s
-            .withFileSystemLockExpire(1) // 1 minute
-            .withClientNumRetries(30)
-            .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
-            .build());
-      }
+    }
+
+    if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()) && OptionsResolver.needsGuardByLock(conf)) {
+      // configure the fs lock provider by default
+      builder.withLockConfig(HoodieLockConfig.newBuilder()
+          .withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))
+          .withLockProvider(FileSystemBasedLockProvider.class)
+          .withLockWaitTimeInMillis(2000L) // 2s
+          .withFileSystemLockExpire(1) // 1 minute
+          .withClientNumRetries(30)
+          .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
+          .build());
     }
 
     // do not configure cleaning strategy as LAZY until multi-writers is supported.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 1e5af896928..e8578ad82ba 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
 
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -138,6 +140,19 @@ public class StreamerUtil {
         .build();
   }
 
+  /**
+   * Returns the index config with given configuration.
+   */
+  public static HoodieIndexConfig getIndexConfig(Configuration conf) {
+    return HoodieIndexConfig.newBuilder()
+        .withIndexType(OptionsResolver.getIndexType(conf))
+        .withBucketNum(String.valueOf(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
+        .withRecordKeyField(conf.getString(FlinkOptions.RECORD_KEY_FIELD))
+        .withIndexKeyField(OptionsResolver.getIndexKeyField(conf))
+        .withEngineType(EngineType.FLINK)
+        .build();
+  }
+
   /**
    * Converts the give {@link Configuration} to {@link TypedProperties}.
    * The default values are also set up.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
index a5e9f311456..0f3de9127b7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
@@ -20,6 +20,8 @@ package org.apache.hudi.util;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -43,7 +45,9 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER
 public class ViewStorageProperties {
   private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class);
 
-  private static final String FILE_NAME = "view_storage_conf.properties";
+  private static final String FILE_NAME = "view_storage_conf";
+
+  private static final String FILE_SUFFIX = ".properties";
 
   /**
    * Initialize the {@link #FILE_NAME} meta file.
@@ -52,7 +56,7 @@ public class ViewStorageProperties {
       String basePath,
       FileSystemViewStorageConfig config,
       Configuration flinkConf) throws IOException {
-    Path propertyPath = getPropertiesFilePath(basePath);
+    Path propertyPath = getPropertiesFilePath(basePath, flinkConf.getString(FlinkOptions.WRITE_CLIENT_ID));
     FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(flinkConf));
     fs.delete(propertyPath, false);
     try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
@@ -65,7 +69,7 @@ public class ViewStorageProperties {
    * Read the {@link FileSystemViewStorageConfig} with given table base path.
    */
   public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration conf) {
-    Path propertyPath = getPropertiesFilePath(basePath);
+    Path propertyPath = getPropertiesFilePath(basePath, conf.getString(FlinkOptions.WRITE_CLIENT_ID));
     LOG.info("Loading filesystem view storage properties from " + propertyPath);
     FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(conf));
     Properties props = new Properties();
@@ -79,8 +83,9 @@ public class ViewStorageProperties {
     }
   }
 
-  private static Path getPropertiesFilePath(String basePath) {
+  private static Path getPropertiesFilePath(String basePath, String uniqueId) {
     String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
-    return new Path(auxPath, FILE_NAME);
+    String fileName = StringUtils.isNullOrEmpty(uniqueId) ? FILE_NAME : FILE_NAME + "_" + uniqueId;
+    return new Path(auxPath, fileName);
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index e5d2b315ca7..e9574e61d92 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -113,7 +113,6 @@ public class ITTestDataStreamWrite extends TestLogger {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
     conf.setString(FlinkOptions.INDEX_TYPE, indexType);
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
-    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
 
     testWriteToHoodie(conf, "cow_write", 2, EXPECTED);
@@ -159,7 +158,6 @@ public class ITTestDataStreamWrite extends TestLogger {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
     conf.setString(FlinkOptions.INDEX_TYPE, indexType);
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
-    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
 
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 21dd6fd1d18..f705a63f1ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -21,9 +21,13 @@ package org.apache.hudi.sink;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.utils.TestWriteBase;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.TestConfigurations;
@@ -239,7 +243,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
 
   @Test
   public void testInsertAppendMode() throws Exception {
-    prepareInsertPipeline()
+    conf.setString(FlinkOptions.OPERATION, "insert");
+    preparePipeline()
         // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
         .consume(TestData.DATA_SET_INSERT_SAME_KEY)
         .checkpoint(1)
@@ -294,7 +299,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
 
-    prepareInsertPipeline(conf)
+    preparePipeline()
         .consume(TestData.DATA_SET_INSERT_SAME_KEY)
         .checkpoint(1)
         .handleEvents(1)
@@ -422,6 +427,69 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  // case1: txn2's time range is involved in txn1
+  //      |----------- txn1 -----------|
+  //              | ----- txn2 ----- |
+  @Test
+  public void testWriteMultiWriterInvolved() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+    conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertEmptyDataFiles();
+    // now start pipeline2 and commit the txn
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    preparePipeline(conf2)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED3, 1);
+    // step to commit the 2nd txn, should throw exception
+    // for concurrent modification of same fileGroups
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpointCompleteThrows(1, HoodieWriteConflictException.class, "Cannot resolve conflicts");
+  }
+
+  // case2: txn2's time range has partial overlap with txn1
+  //      |----------- txn1 -----------|
+  //                       | ----- txn2 ----- |
+  @Test
+  public void testWriteMultiWriterPartialOverlapping() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+    conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertEmptyDataFiles();
+    // now start pipeline2 and suspend the txn commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertEmptyDataFiles();
+
+    // step to commit the 1st txn, should succeed
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED3, 1);
+
+    // step to commit the 2nd txn, should throw exception
+    // for concurrent modification of same fileGroups
+    pipeline2.checkpoint(1)
+        .assertNextEvent()
+        .checkpointCompleteThrows(1, HoodieWriteConflictException.class, "Cannot resolve conflicts");
+  }
+
   @Test
   public void testReuseEmbeddedServer() throws IOException {
     conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
@@ -449,14 +517,6 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     return TestHarness.instance().preparePipeline(tempFile, conf);
   }
 
-  protected TestHarness prepareInsertPipeline() throws Exception {
-    return prepareInsertPipeline(conf);
-  }
-
-  protected TestHarness prepareInsertPipeline(Configuration conf) throws Exception {
-    return TestHarness.instance().preparePipeline(tempFile, conf, true);
-  }
-
   protected HoodieTableType getTableType() {
     return HoodieTableType.COPY_ON_WRITE;
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index df01fc9076f..4683955d8e0 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -141,11 +141,21 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  public void testInsertAppendMode() {
+    // append mode is only valid for cow table.
+  }
+
   @Override
   public void testInsertClustering() {
     // insert clustering is only valid for cow table.
   }
 
+  @Test
+  public void testInsertAsyncClustering() {
+    // insert async clustering is only valid for cow table.
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 704d94caba3..f6ee44db254 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,11 +38,21 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
   }
 
+  @Test
+  public void testInsertAppendMode() {
+    // append mode is only valid for cow table.
+  }
+
   @Override
   public void testInsertClustering() {
     // insert clustering is only valid for cow table.
   }
 
+  @Test
+  public void testInsertAsyncClustering() {
+    // insert async clustering is only valid for cow table.
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index fe7ce3f9478..b49441f3b9d 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -26,8 +26,9 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.stream.IntStream;
@@ -41,24 +42,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class TestCkpMetadata {
 
-  private CkpMetadata metadata;
-
   @TempDir
   File tempFile;
 
   @BeforeEach
   public void beforeEach() throws Exception {
     String basePath = tempFile.getAbsolutePath();
-    FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(new Configuration()));
-
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     StreamerUtil.initTableIfNotExists(conf);
-
-    this.metadata = CkpMetadata.getInstance(fs, basePath);
   }
 
-  @Test
-  void testWriteAndReadMessage() {
+  @ParameterizedTest
+  @ValueSource(strings = {"", "1"})
+  void testWriteAndReadMessage(String uniqueId) {
+    CkpMetadata metadata = getCkpMetadata(uniqueId);
     // write and read 5 committed checkpoints
     IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + ""));
 
@@ -74,4 +71,10 @@ public class TestCkpMetadata {
     metadata.abortInstant("7");
     assertThat(metadata.getMessages().size(), is(5));
   }
+
+  private CkpMetadata getCkpMetadata(String uniqueId) {
+    String basePath = tempFile.getAbsolutePath();
+    FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(new Configuration()));
+    return CkpMetadata.getInstance(fs, basePath, uniqueId);
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
new file mode 100644
index 00000000000..49d6b4b1e2b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.streaming.util.MockStreamTask;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link BucketStreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class BucketStreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
+  private final Configuration conf;
+
+  private final IOManager ioManager;
+  private final StreamingRuntimeContext runtimeContext;
+  private final MockOperatorEventGateway gateway;
+  private final MockOperatorCoordinatorContext coordinatorContext;
+  private final StreamWriteOperatorCoordinator coordinator;
+  private final MockStateInitializationContext stateInitializationContext;
+
+  /**
+   * Function that converts row data to HoodieRecord.
+   */
+  private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+
+  /**
+   * Stream write function.
+   */
+  private StreamWriteFunction<HoodieRecord<?>> writeFunction;
+
+  private CompactFunctionWrapper compactFunctionWrapper;
+
+  private final MockStreamTask streamTask;
+
+  private final StreamConfig streamConfig;
+
+  private final boolean asyncCompaction;
+
+  public BucketStreamWriteFunctionWrapper(String tablePath) throws Exception {
+    this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+  }
+
+  public BucketStreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception {
+    this.ioManager = new IOManagerAsync();
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+    this.gateway = new MockOperatorEventGateway();
+    this.conf = conf;
+    // one function
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+    this.stateInitializationContext = new MockStateInitializationContext();
+    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+    this.streamConfig = new StreamConfig(conf);
+    streamConfig.setOperatorID(new OperatorID());
+    this.streamTask = new MockStreamTaskBuilder(environment)
+        .setConfig(new StreamConfig(conf))
+        .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
+        .build();
+    this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, this.streamTask, this.streamConfig);
+  }
+
+  public void openFunction() throws Exception {
+    this.coordinator.start();
+    this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+    toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
+    toHoodieFunction.setRuntimeContext(runtimeContext);
+    toHoodieFunction.open(conf);
+
+    setupWriteFunction();
+
+    if (asyncCompaction) {
+      compactFunctionWrapper.openFunction();
+    }
+  }
+
+  public void invoke(I record) throws Exception {
+    HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
+    writeFunction.processElement(hoodieRecord, null, null);
+  }
+
+  public WriteMetadataEvent[] getEventBuffer() {
+    return this.coordinator.getEventBuffer();
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.gateway.getNextEvent();
+  }
+
+  public Map<String, List<HoodieRecord>> getDataBuffer() {
+    return this.writeFunction.getDataBuffer();
+  }
+
+  public void checkpointFunction(long checkpointId) throws Exception {
+    // checkpoint the coordinator first
+    this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+    writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+    stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+  }
+
+  public void endInput() {
+    writeFunction.endInput();
+  }
+
+  public void checkpointComplete(long checkpointId) {
+    stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+    coordinator.notifyCheckpointComplete(checkpointId);
+    if (asyncCompaction) {
+      try {
+        compactFunctionWrapper.compact(checkpointId);
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
+  }
+
+  public void checkpointFails(long checkpointId) {
+    coordinator.notifyCheckpointAborted(checkpointId);
+  }
+
+  public void subTaskFails(int taskID) throws Exception {
+    coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+    setupWriteFunction();
+  }
+
+  public void close() throws Exception {
+    coordinator.close();
+    ioManager.close();
+    writeFunction.close();
+    if (compactFunctionWrapper != null) {
+      compactFunctionWrapper.close();
+    }
+  }
+
+  public StreamWriteOperatorCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  public MockOperatorCoordinatorContext getCoordinatorContext() {
+    return coordinatorContext;
+  }
+
+  public boolean isConforming() {
+    return this.writeFunction.isConfirming();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void setupWriteFunction() throws Exception {
+    writeFunction = new BucketStreamWriteFunction<>(conf);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.initializeState(this.stateInitializationContext);
+    writeFunction.open(conf);
+
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b6ae0767d68..a6e7a19952c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 
@@ -38,13 +39,16 @@ import org.hamcrest.MatcherAssert;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -121,23 +125,25 @@ public class TestWriteBase {
     private Configuration conf;
     private TestFunctionWrapper<RowData> pipeline;
 
+    private CkpMetadata ckpMetadata;
+
     private String lastPending;
     private String lastComplete;
 
     public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
-      preparePipeline(basePath, conf, false);
-      return this;
-    }
-
-    public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception {
       this.baseFile = basePath;
       this.basePath = this.baseFile.getAbsolutePath();
       this.conf = conf;
-      this.pipeline = append
-          ? new InsertFunctionWrapper<>(this.basePath, conf)
-          : new StreamWriteFunctionWrapper<>(this.basePath, conf);
+      if (OptionsResolver.isAppendMode(conf)) {
+        this.pipeline = new InsertFunctionWrapper<>(this.basePath, conf);
+      } else if (OptionsResolver.isBucketIndexType(conf)) {
+        this.pipeline = new BucketStreamWriteFunctionWrapper<>(this.basePath, conf);
+      } else {
+        this.pipeline = new StreamWriteFunctionWrapper<>(this.basePath, conf);
+      }
       // open the function and ingest data
       this.pipeline.openFunction();
+      this.ckpMetadata = CkpMetadata.getInstance(conf);
       return this;
     }
 
@@ -258,10 +264,23 @@ public class TestWriteBase {
       this.lastPending = lastPendingInstant();
       this.pipeline.checkpointComplete(checkpointId);
       // started a new instant already
-      checkInflightInstant();
+      String newInflight = checkInflightInstant();
       checkInstantState(HoodieInstant.State.COMPLETED, lastPending);
       this.lastComplete = lastPending;
-      this.lastPending = lastPendingInstant(); // refresh last pending instant
+      this.lastPending = newInflight; // refresh last pending instant
+      return this;
+    }
+
+    /**
+     * Asserts the checkpoint with id {@code checkpointId} throws when completes .
+     */
+    public TestHarness checkpointCompleteThrows(long checkpointId, Class<?> cause, String msg) {
+      this.pipeline.checkpointComplete(checkpointId);
+      assertTrue(this.pipeline.getCoordinatorContext().isJobFailed(), "Job should have been failed");
+      Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
+      assertThat(throwable, instanceOf(cause));
+      assertThat(throwable.getMessage(), containsString(msg));
+      // assertThrows(HoodieException.class, () -> , msg);
       return this;
     }
 
@@ -318,12 +337,28 @@ public class TestWriteBase {
      * Asserts the data files are empty.
      */
     public TestHarness assertEmptyDataFiles() {
-      File[] dataFiles = baseFile.listFiles(file -> !file.getName().startsWith("."));
-      assertNotNull(dataFiles);
-      assertThat(dataFiles.length, is(0));
+      assertFalse(fileExists(), "No data files should have been created");
       return this;
     }
 
+    private boolean fileExists() {
+      List<File> dirsToCheck = new ArrayList<>();
+      dirsToCheck.add(baseFile);
+      while (!dirsToCheck.isEmpty()) {
+        File dir = dirsToCheck.remove(0);
+        for (File file : Objects.requireNonNull(dir.listFiles())) {
+          if (!file.getName().startsWith(".")) {
+            if (file.isDirectory()) {
+              dirsToCheck.add(file);
+            } else {
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
     public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
       checkWrittenData(expected, 4);
       return this;
@@ -385,12 +420,13 @@ public class TestWriteBase {
     }
 
     private String lastPendingInstant() {
-      return TestUtils.getLastPendingInstant(basePath);
+      return this.ckpMetadata.lastPendingInstant();
     }
 
-    private void checkInflightInstant() {
-      final String instant = TestUtils.getLastPendingInstant(basePath);
+    private String checkInflightInstant() {
+      final String instant = this.ckpMetadata.lastPendingInstant();
       assertNotNull(instant);
+      return instant;
     }
 
     private void checkInstantState(HoodieInstant.State state, String instantStr) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
index 084f211e660..a8b06c111cd 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
@@ -27,6 +27,8 @@ import org.apache.hudi.util.ViewStorageProperties;
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,19 +43,21 @@ public class TestViewStorageProperties {
   @TempDir
   File tempFile;
 
-  @Test
-  void testReadWriteProperties() throws IOException {
+  @ParameterizedTest
+  @ValueSource(strings = {"", "1"})
+  void testReadWriteProperties(String uniqueId) throws IOException {
     String basePath = tempFile.getAbsolutePath();
     FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder()
         .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
         .withRemoteServerHost("host1")
         .withRemoteServerPort(1234).build();
     Configuration flinkConfig = new Configuration();
+    flinkConfig.setString(FlinkOptions.WRITE_CLIENT_ID, uniqueId);
     ViewStorageProperties.createProperties(basePath, config, flinkConfig);
     ViewStorageProperties.createProperties(basePath, config, flinkConfig);
     ViewStorageProperties.createProperties(basePath, config, flinkConfig);
 
-    FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, new Configuration());
+    FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, flinkConfig);
     assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
     assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
     assertThat(readConfig.getRemoteViewServerPort(), is(1234));