You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/21 01:17:32 UTC

[hudi] branch master updated: [HUDI-5407][HUDI-5408] Fixing rollback in MDT to be eager (#7490)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6652a84aa [HUDI-5407][HUDI-5408] Fixing rollback in MDT to be eager (#7490)
6f6652a84aa is described below

commit 6f6652a84aa5e738cdb6932f41880ae24415b06c
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Jan 20 17:17:26 2023 -0800

    [HUDI-5407][HUDI-5408] Fixing rollback in MDT to be eager (#7490)
    
    Metadata table could deem some invalid data as valid in some rare conditions. Especially when there are partially failed commits in MDT and the commit that failed refers to compaction or clustering in data table, we might see some anomalies.
    
    Scenarios where this could fail w/ inline compaction.
    
    Data table timeline
    t1.dc t2.comp.req. |Crash t3.dc t2.comp.inflight t2.commit
    
    MDT timeline
    t1.dc. t2.comp.inflight |Crash t3.dc t4.rb(t2) t2.dc
    
    The first attempt of t2 in MDT should be rolled back since it crashed mid-way. in other words, if there are any log blocks written by t2 in MDT, it should be deemed invalid.
    
    But what happens is, here is how the log blocks are laid out.
    
    log1(t1). log2(t2 first attempt) crash.... log3 (t3) log4(t4.rb rolling back t2) ... log5 (t2)
    
    So, when we read the log blocks via AbstractLogRecordReader, ideally we want to ignore log2. but when we encounter log4 for a rollback block, we only check the previous log block for matching commit to rollback. since it does not match w/ t2, we assume log4 is a duplicate rollback and hence still deem log2 as a valid log block.
    
    hence MDT could serve more data files which are not valid from a FS based listing standpoint.
    
    Fix: switching failed writes cleaning policy in MDT to EAGER will solve this issue. Prior to this patch, rollback was set to lazy and hence happens only when we trigger clean at the end after delta commit succeeds in MDT.
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   4 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   8 ++
 .../metadata/HoodieBackedTableMetadataWriter.java  |   9 +-
 .../rollback/BaseRollbackActionExecutor.java       |   4 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |   9 +-
 .../functional/TestHoodieBackedMetadata.java       | 107 +++++++++++++++++++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  34 +++++--
 .../apache/hudi/utilities/HoodieClusteringJob.java |   5 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  26 ++++-
 9 files changed, 189 insertions(+), 17 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 7ea70f63998..ec1041e3ab0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -667,6 +667,7 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
 
   /**
    * Rollback all failed writes.
+   * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites() {
     return rollbackFailedWrites(false);
@@ -675,6 +676,7 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
   /**
    * Rollback all failed writes.
    * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+   * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites(boolean skipLocking) {
     HoodieTable table = createTable(config, hadoopConf);
@@ -682,7 +684,7 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
     Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
     rollbackFailedWrites(pendingRollbacks, skipLocking);
-    return true;
+    return !pendingRollbacks.isEmpty();
   }
 
   protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b361f8918c4..818fa82e568 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1263,6 +1263,14 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     }
   }
 
+  /**
+   * Rollback failed writes if any.
+   * @return true if rollback happened. false otherwise.
+   */
+  public boolean rollbackFailedWrites() {
+    return tableServiceClient.rollbackFailedWrites();
+  }
+
   /**
    * add columns to table.
    *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cc06b80aafd..fd83d24ea56 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -268,7 +268,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
             .withAutoClean(false)
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
             .build())
         // we will trigger archive manually, to ensure only regular writer invokes it
@@ -1041,7 +1041,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     // delta commits synced over will not have an instant time lesser than the last completed instant on the
     // metadata table.
     final String compactionInstantTime = latestDeltaCommitTimeInMetadataTable + METADATA_COMPACTION_TIME_SUFFIX;
-    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
+    // we need to avoid checking compaction w/ same instant again.
+    // lets say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT.
+    // and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT.
+    // and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time.
+    if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
+        && writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
       writeClient.compact(compactionInstantTime);
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index b5ae9471e58..b88b332172b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -155,12 +156,13 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> extends BaseActionE
     // since with LAZY rollback we support parallel writing which can allow a new inflight while rollback is ongoing
     // Remove this once we support LAZY rollback of failed writes by default as parallel writing becomes the default
     // writer mode.
-    if (config.getFailedWritesCleanPolicy().isEager()) {
+    if (config.getFailedWritesCleanPolicy().isEager()  && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
       final String instantTimeToRollback = instantToRollback.getTimestamp();
       HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
       HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
       // Make sure only the last n commits are being rolled back
       // If there is a commit in-between or after that is not rolled back, then abort
+      // this condition may not hold good for metadata table. since the order of commits applied to MDT is data table commits and the ordering could be different.
       if ((instantTimeToRollback != null) && !commitTimeline.empty()
           && !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
         // check if remnants are from a previous LAZY rollback config, if yes, let out of order rollback continue
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 272d3d47985..81526c25bcc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
@@ -132,7 +133,11 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = HoodieJavaRDD.getJavaRDD(preppedRecords);
 
-    try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
+    try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
+      // rollback partially failed writes if any.
+      if (writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
       if (canTriggerTableService) {
         // trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table,
         // we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata
@@ -162,7 +167,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
         // clean plan is the same, so we don't need to delete the requested and inflight instant
         // files in the active timeline.
       }
-      
+
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 84e6d342883..d46e02cf39f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -906,6 +906,65 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testMetadataRollbackWithCompaction() throws Exception {
+    HoodieTableType tableType = COPY_ON_WRITE;
+    init(tableType, false);
+    writeConfig = getWriteConfigBuilder(false, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withPopulateMetaFields(true)
+            .build())
+        .build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
+      // Write 1 (Bulk insert)
+      String newCommitTime1 = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime1, 100);
+      client.startCommitWithTime(newCommitTime1);
+      JavaRDD writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime1);
+      client.commit(newCommitTime1, writeStatuses);
+
+      String newCommitTime2 = "0000002";
+      records = dataGen.generateUniqueUpdates(newCommitTime2, 20);
+      client.startCommitWithTime(newCommitTime2);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime2);
+      client.commit(newCommitTime2, writeStatuses);
+
+      String newCommitTime3 = "0000003";
+      records = dataGen.generateUniqueUpdates(newCommitTime3, 20);
+      client.startCommitWithTime(newCommitTime3);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime3);
+      client.commit(newCommitTime3, writeStatuses);
+
+      // rollback "3" so that there is no "3" in data table timeline, but there exists a DC "3" in metadata timeline.
+      client.rollback(newCommitTime3);
+
+      // mimicing crash or making an inflight in metadata table.
+      Path toDelete = new Path(metaClient.getMetaPath() + "/metadata/.hoodie/" + newCommitTime2 + "." + HoodieTimeline.DELTA_COMMIT_ACTION);
+      metaClient.getFs().delete(toDelete);
+
+      // re-ingest w/ same commit time.
+      records = dataGen.generateUniqueUpdates(newCommitTime3, 20);
+      client.startCommitWithTime(newCommitTime3);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime3);
+      client.commit(newCommitTime3, writeStatuses);
+
+      // collect all commit meta files from metadata table.
+      FileStatus[] metaFiles = metaClient.getFs().listStatus(new Path(metaClient.getMetaPath() + "/metadata/.hoodie"));
+      List<FileStatus> commit3Files = Arrays.stream(metaFiles).filter(fileStatus ->
+          fileStatus.getPath().getName().equals(newCommitTime3 + "." + HoodieTimeline.DELTA_COMMIT_ACTION)).collect(Collectors.toList());
+      List<FileStatus> rollbackFiles = Arrays.stream(metaFiles).filter(fileStatus ->
+          fileStatus.getPath().getName().endsWith("." + HoodieTimeline.ROLLBACK_ACTION)).collect(Collectors.toList());
+
+      // ensure commit2's delta commit in MDT has last mod time > the actual rollback for previous failed commit i.e. commit2.
+      // if rollback wasn't eager, rollback's last mod time will be lower than the commit3'd delta commit last mod time.
+      assertTrue(commit3Files.get(0).getModificationTime() > rollbackFiles.get(0).getModificationTime());
+    }
+  }
+
   /**
    * Test arguments - Table type, populate meta fields, exclude key from payload.
    */
@@ -1594,6 +1653,54 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testEagerRollbackinMDT() throws IOException {
+    tableType = MERGE_ON_READ;
+    initPath();
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig);
+    // Write 1 (Bulk insert)
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    List<HoodieRecord> records = dataGen.generateInserts(commit1, 20);
+    client.startCommitWithTime(commit1);
+    List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commit1).collect();
+    assertNoWriteErrors(writeStatuses);
+
+    // Write 2 (inserts)
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(commit2);
+    records = dataGen.generateInserts(commit2, 20);
+    writeStatuses = client.insert(jsc.parallelize(records, 1), commit2).collect();
+    assertNoWriteErrors(writeStatuses);
+    // remove latest completed delta commit from MDT.
+    Path toDelete = new Path(metaClient.getMetaPath() + "/metadata/.hoodie/" + commit2 + "." + HoodieTimeline.DELTA_COMMIT_ACTION);
+    metaClient.getFs().delete(toDelete);
+
+    // Write 3 (updates)
+    client = new SparkRDDWriteClient(engineContext, writeConfig);
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(commit3);
+    records = dataGen.generateUniqueUpdates(commit3, 10);
+    writeStatuses = client.upsert(jsc.parallelize(records, 1), commit3).collect();
+    assertNoWriteErrors(writeStatuses);
+
+    // ensure that 000003 is after rollback of the partially failed 2nd commit.
+    HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() + "/metadata/").setConf(metaClient.getHadoopConf()).build();
+    HoodieInstant rollbackInstant = metadataMetaClient.getActiveTimeline().getRollbackTimeline().getInstants().get(0);
+
+    // collect all commit meta files from metadata table.
+    FileStatus[] metaFiles = metaClient.getFs().listStatus(new Path(metaClient.getMetaPath() + "/metadata/.hoodie"));
+    List<FileStatus> commit3Files = Arrays.stream(metaFiles).filter(fileStatus ->
+        fileStatus.getPath().getName().equals(commit3 + "." + HoodieTimeline.DELTA_COMMIT_ACTION)).collect(Collectors.toList());
+    List<FileStatus> rollbackFiles = Arrays.stream(metaFiles).filter(fileStatus ->
+        fileStatus.getPath().getName().equals(rollbackInstant.getTimestamp() + "." + HoodieTimeline.ROLLBACK_ACTION)).collect(Collectors.toList());
+
+    // ensure commit3's delta commit in MDT has last mod time > the actual rollback for previous failed commit i.e. commit2.
+    // if rollback wasn't eager, rollback's last mod time will be lower than the commit3'd delta commit last mod time.
+    assertTrue(commit3Files.get(0).getModificationTime() > rollbackFiles.get(0).getModificationTime());
+  }
+
   /**
    * Test all major table operations with the given table, config and context.
    *
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 38033b9af35..d67e294e8b9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -28,10 +28,12 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
 import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -51,6 +53,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
@@ -81,6 +84,7 @@ import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -2415,9 +2419,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard, populateMetCols);
   }
 
-  @ParameterizedTest
-  @MethodSource("rollbackFailedCommitsParams")
-  public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) throws Exception {
+  //@ParameterizedTest
+  //@MethodSource("rollbackFailedCommitsParams")
+  @Test
+  public void testRollbackFailedCommits() throws Exception {
+    // HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields
+    HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.NEVER;
+    boolean populateMetaFields = true;
     HoodieTestUtils.init(hadoopConf, basePath);
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
 
@@ -2476,11 +2484,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
               == 0);
       assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3);
     } else if (cleaningPolicy.isNever()) {
+      // never will get translated to Lazy if OCC is enabled.
       assertTrue(
               timeline
                       .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
                       .countInstants()
-                      == 0);
+                      == 2);
       // There should be no clean or rollback action on the timeline
       assertTrue(
               timeline
@@ -2546,8 +2555,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     client.startCommit();
     timeline = metaClient.getActiveTimeline().reload();
+    // since OCC is enabled, hudi auto flips the cleaningPolicy to Lazy.
     assertTrue(timeline.getTimelineOfActions(
-            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5);
+            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
     assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
   }
 
@@ -2780,6 +2790,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) {
+    Properties properties = new Properties();
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
+    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
+    if (!populateMetaFields) {
+      getPropertiesForKeyGen(populateMetaFields).entrySet().forEach(kv ->
+          properties.put(kv.getKey(), kv.getValue()));
+    }
     return getConfigBuilder()
         .withEmbeddedTimelineServerEnabled(false)
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -2790,7 +2808,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withAutoCommit(false)
-        .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .build())
+        .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withProperties(properties).build();
   }
 
   public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 3c97b732eb6..634608e965a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -54,7 +54,7 @@ public class HoodieClusteringJob {
   private final Config cfg;
   private final TypedProperties props;
   private final JavaSparkContext jsc;
-  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetaClient metaClient;
 
   public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
     this.cfg = cfg;
@@ -180,6 +180,7 @@ public class HoodieClusteringJob {
   }
 
   private int doCluster(JavaSparkContext jsc) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
       if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
@@ -208,6 +209,7 @@ public class HoodieClusteringJob {
   }
 
   private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
       return doSchedule(client);
@@ -224,6 +226,7 @@ public class HoodieClusteringJob {
 
   private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
     LOG.info("Step 1: Do schedule");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
       Option<String> instantTime = Option.empty();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 30ad43f894b..fc59d0ff9e0 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -162,11 +163,17 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType recordType) throws IOException {
-    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
+    return initialHoodieDeltaStreamer(tableBasePath, totalRecords, asyncCluster, recordType, WriteOperationType.INSERT);
+  }
+
+  protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType recordType,
+                                                             WriteOperationType writeOperationType) throws IOException {
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, writeOperationType);
     TestHelpers.addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", asyncCluster, ""));
+    cfg.configs.addAll(getAllMultiWriterConfigs());
     return new HoodieDeltaStreamer(cfg, jsc);
   }
 
@@ -179,10 +186,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   }
 
   protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute,
-      Boolean retryLastFailedClusteringJob, HoodieRecordType recordType) {
+                                                           Boolean retryLastFailedClusteringJob, HoodieRecordType recordType) {
     HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
         clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
     TestHelpers.addRecordMerger(recordType, scheduleClusteringConfig.configs);
+    scheduleClusteringConfig.configs.addAll(getAllMultiWriterConfigs());
     return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
   }
 
@@ -1099,6 +1107,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     return configs;
   }
 
+  private List<String> getAllMultiWriterConfigs() {
+    List<String> configs = new ArrayList<>();
+    configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getCanonicalName()));
+    configs.add(String.format("%s=%s", LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"));
+    configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+    configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
+    return configs;
+  }
+
   private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
                                                                      String clusteringInstantTime,
                                                                      Boolean runSchedule) {
@@ -1305,6 +1322,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
+    cfg.configs.addAll(getAllMultiWriterConfigs());
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
 
@@ -1345,7 +1363,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   @CsvSource(value = {"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"})
   public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode, HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/asyncClustering2";
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false", recordType);
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false", recordType, WriteOperationType.BULK_INSERT);
     HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, runningMode, recordType);
 
     deltaStreamerTestRunner(ds, (r) -> {
@@ -1356,7 +1374,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
         if (result == 0) {
           LOG.info("Cluster success");
         } else {
-          LOG.warn("Import failed");
+          LOG.warn("Cluster failed");
           if (!runningMode.toLowerCase().equals(EXECUTE)) {
             return false;
           }