You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/13 06:24:07 UTC

[hudi] 03/05: [HUDI-5764] Rollback delta commits from `HoodieIndexer` lazily in metadata table (#7921)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d4106f35b4aee53ea5cb1430288f397b37c81183
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Sun Feb 12 03:30:10 2023 -0800

    [HUDI-5764] Rollback delta commits from `HoodieIndexer` lazily in metadata table (#7921)
    
    Fixes two issues:
    - Makes the rollback of indexing delta commit lazy in the metadata table, otherwise, it would be cleaned up eagerly by other regular writes.
    - Uses a suffix (004) appending to the up-to-instant used by the async index to avoid collision with existing completed delta commit of the same instant time.
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  48 +++++++++
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  13 +++
 .../metadata/HoodieBackedTableMetadataWriter.java  |  34 ++++---
 .../java/org/apache/hudi/table/HoodieTable.java    |  38 +++++++-
 .../table/action/index/RunIndexActionExecutor.java |   5 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |  21 +++-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  12 ++-
 .../SparkHoodieBackedTableMetadataWriter.java      |  20 +++-
 .../org/apache/hudi/table/HoodieSparkTable.java    |  10 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  12 ++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  20 ++++
 .../apache/hudi/utilities/TestHoodieIndexer.java   | 108 +++++++++++++++++++--
 12 files changed, 298 insertions(+), 43 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 390bc4b9714..301ed61bf4e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -48,6 +48,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -71,6 +72,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
 public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
 
@@ -659,8 +661,41 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
     return infoMap;
   }
 
+  /**
+   * Rolls back the failed delta commits corresponding to the indexing action.
+   * Such delta commits are identified based on the suffix `METADATA_INDEXER_TIME_SUFFIX` ("004").
+   * <p>
+   * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks
+   *  in the metadata table is landed.
+   *
+   * @return {@code true} if rollback happens; {@code false} otherwise.
+   */
+  protected boolean rollbackFailedIndexingCommits() {
+    HoodieTable table = createTable(config, hadoopConf);
+    List<String> instantsToRollback = getFailedIndexingCommitsToRollback(table.getMetaClient());
+    Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
+    instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+    rollbackFailedWrites(pendingRollbacks);
+    return !pendingRollbacks.isEmpty();
+  }
+
+  protected List<String> getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) {
+    Stream<HoodieInstant> inflightInstantsStream = metaClient.getCommitsTimeline()
+        .filter(instant -> !instant.isCompleted()
+            && isIndexingCommit(instant.getTimestamp()))
+        .getInstantsAsStream();
+    return inflightInstantsStream.filter(instant -> {
+      try {
+        return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+      } catch (IOException io) {
+        throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+      }
+    }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+  }
+
   /**
    * Rollback all failed writes.
+   *
    * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites() {
@@ -699,6 +734,19 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i
     Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
         .getReverseOrderedInstants();
     if (cleaningPolicy.isEager()) {
+      // Metadata table uses eager cleaning policy, but we need to exclude inflight delta commits
+      // from the async indexer (`HoodieIndexer`).
+      // TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the
+      //  metadata table is landed.
+      if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) {
+        return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
+          if (curInstantTime.isPresent()) {
+            return !entry.equals(curInstantTime.get());
+          } else {
+            return !isIndexingCommit(entry);
+          }
+        }).collect(Collectors.toList());
+      }
       return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
         if (curInstantTime.isPresent()) {
           return !entry.equals(curInstantTime.get());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 17956479762..10c0db97151 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1279,8 +1279,21 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     }
   }
 
+  /**
+   * Rolls back the failed delta commits corresponding to the indexing action.
+   * <p>
+   * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks
+   *  in the metadata table is landed.
+   *
+   * @return {@code true} if rollback happens; {@code false} otherwise.
+   */
+  public boolean lazyRollbackFailedIndexing() {
+    return tableServiceClient.rollbackFailedIndexingCommits();
+  }
+
   /**
    * Rollback failed writes if any.
+   *
    * @return true if rollback happened. false otherwise.
    */
   public boolean rollbackFailedWrites() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5e8367e2095..3338872efbb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -93,6 +93,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
@@ -100,6 +101,7 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
 
@@ -134,15 +136,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
   /**
    * Hudi backed table metadata writer.
    *
-   * @param hadoopConf               - Hadoop configuration to use for the metadata writer
-   * @param writeConfig              - Writer config
-   * @param engineContext            - Engine context
-   * @param actionMetadata           - Optional action metadata to help decide initialize operations
-   * @param <T>                      - Action metadata types extending Avro generated SpecificRecordBase
-   * @param inflightInstantTimestamp - Timestamp of any instant in progress
+   * @param hadoopConf                 Hadoop configuration to use for the metadata writer
+   * @param writeConfig                Writer config
+   * @param failedWritesCleaningPolicy Cleaning policy on failed writes
+   * @param engineContext              Engine context
+   * @param actionMetadata             Optional action metadata to help decide initialize operations
+   * @param <T>                        Action metadata types extending Avro generated SpecificRecordBase
+   * @param inflightInstantTimestamp   Timestamp of any instant in progress
    */
   protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf,
                                                                            HoodieWriteConfig writeConfig,
+                                                                           HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
                                                                            HoodieEngineContext engineContext,
                                                                            Option<T> actionMetadata,
                                                                            Option<String> inflightInstantTimestamp) {
@@ -154,7 +158,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
     if (writeConfig.isMetadataTableEnabled()) {
       this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
-      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
+      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
       enabled = true;
 
       // Inline compaction and auto clean is required as we dont expose this table outside
@@ -181,7 +185,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
   public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig,
                                          HoodieEngineContext engineContext) {
-    this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty());
+    this(hadoopConf, writeConfig, EAGER, engineContext, Option.empty(), Option.empty());
   }
 
   /**
@@ -232,11 +236,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
   protected abstract void initRegistry();
 
   /**
-   * Create a {@code HoodieWriteConfig} to use for the Metadata Table.
+   * Create a {@code HoodieWriteConfig} to use for the Metadata Table.  This is used by async
+   * indexer only.
    *
-   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   * @param writeConfig                {@code HoodieWriteConfig} of the main dataset writer
+   * @param failedWritesCleaningPolicy Cleaning policy on failed writes
    */
-  private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
+  private HoodieWriteConfig createMetadataWriteConfig(
+      HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
     int parallelism = writeConfig.getMetadataInsertParallelism();
 
     int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
@@ -268,7 +275,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
             .withAutoClean(false)
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
             .retainCommits(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.defaultValue())
             .build())
         // we will trigger archive manually, to ensure only regular writer invokes it
@@ -875,7 +882,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
     dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightIndexes));
     HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
-    initialCommit(indexUptoInstantTime, partitionTypes);
+    initialCommit(indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX, partitionTypes);
   }
 
   /**
@@ -1069,6 +1076,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     // delta commits synced over will not have an instant time lesser than the last completed instant on the
     // metadata table.
     writeClient.clean(instantTime + "002");
+    writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 591ebc430dc..4102515ae01 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -100,6 +101,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.isSchemaCompatible;
+import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
+import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
@@ -872,7 +875,8 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
-    return getMetadataWriter(triggeringInstantTimestamp, Option.empty());
+    return getMetadataWriter(
+        triggeringInstantTimestamp, EAGER, Option.empty());
   }
 
   /**
@@ -895,6 +899,29 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
     }
   }
 
+  /**
+   * Gets the metadata writer for async indexer.
+   *
+   * @param triggeringInstantTimestamp The instant that is triggering this metadata write.
+   * @return An instance of {@link HoodieTableMetadataWriter}.
+   */
+  public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String triggeringInstantTimestamp) {
+    return getMetadataWriter(triggeringInstantTimestamp, LAZY, Option.empty());
+  }
+
+  /**
+   * Gets the metadata writer for regular writes.
+   *
+   * @param triggeringInstantTimestamp The instant that is triggering this metadata write.
+   * @param actionMetadata             Optional action metadata.
+   * @param <R>                        Action metadata type.
+   * @return An instance of {@link HoodieTableMetadataWriter}.
+   */
+  public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(
+      String triggeringInstantTimestamp, Option<R> actionMetadata) {
+    return getMetadataWriter(triggeringInstantTimestamp, EAGER, actionMetadata);
+  }
+
   /**
    * Get Table metadata writer.
    * <p>
@@ -905,11 +932,14 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
    * are blocked from doing the similar initial metadata table creation and
    * the bootstrapping.
    *
-   * @param triggeringInstantTimestamp - The instant that is triggering this metadata write
+   * @param triggeringInstantTimestamp The instant that is triggering this metadata write
+   * @param failedWritesCleaningPolicy Cleaning policy on failed writes
    * @return instance of {@link HoodieTableMetadataWriter}
    */
-  public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
-                                                                                            Option<R> actionMetadata) {
+  protected <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(
+      String triggeringInstantTimestamp,
+      HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+      Option<R> actionMetadata) {
     // Each engine is expected to override this and
     // provide the actual metadata writer, if enabled.
     return Option.empty();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 2fcbfb2b2e5..19aab3629d5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -138,8 +138,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I,
       List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
       if (!firstTimeInitializingMetadataTable) {
         // start indexing for each partition
-        HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
-            .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+        HoodieTableMetadataWriter metadataWriter = table.getIndexingMetadataWriter(instantTime)
+            .orElseThrow(() -> new HoodieIndexException(String.format(
+                "Could not get metadata writer to run index action for instant: %s", instantTime)));
         // this will only build index upto base instant as generated by the plan, we will be doing catchup later
         String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
         LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index aa70f5835c8..cd45685e13e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -42,6 +43,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
+
 /**
  * Flink hoodie backed table metadata writer.
  */
@@ -58,23 +61,35 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
                                                                                 HoodieWriteConfig writeConfig,
                                                                                 HoodieEngineContext context,
                                                                                 Option<T> actionMetadata) {
-    return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, Option.empty());
+    return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, EAGER, context, actionMetadata, Option.empty());
+  }
+
+  public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
+                                                                                HoodieWriteConfig writeConfig,
+                                                                                HoodieEngineContext context,
+                                                                                Option<T> actionMetadata,
+                                                                                Option<String> inFlightInstantTimestamp) {
+    return new FlinkHoodieBackedTableMetadataWriter(
+        conf, writeConfig, EAGER, context, actionMetadata, inFlightInstantTimestamp);
   }
 
   public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
                                                                                 HoodieWriteConfig writeConfig,
+                                                                                HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
                                                                                 HoodieEngineContext context,
                                                                                 Option<T> actionMetadata,
                                                                                 Option<String> inFlightInstantTimestamp) {
-    return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp);
+    return new FlinkHoodieBackedTableMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, actionMetadata, inFlightInstantTimestamp);
   }
 
   <T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
                                                                       HoodieWriteConfig writeConfig,
+                                                                      HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
                                                                       HoodieEngineContext engineContext,
                                                                       Option<T> actionMetadata,
                                                                       Option<String> inFlightInstantTimestamp) {
-    super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp);
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, actionMetadata, inFlightInstantTimestamp);
   }
 
   @Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 3d77844df6f..422fe310b0c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -98,11 +99,14 @@ public abstract class HoodieFlinkTable<T>
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   @Override
-  public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
-                                                                                            Option<T> actionMetadata) {
+  protected <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(
+      String triggeringInstantTimestamp,
+      HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+      Option<T> actionMetadata) {
     if (config.isMetadataTableEnabled()) {
-      return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
-          context, actionMetadata, Option.of(triggeringInstantTimestamp)));
+      return Option.of(FlinkHoodieBackedTableMetadataWriter.create(
+          context.getHadoopConf().get(), config, failedWritesCleaningPolicy, context,
+          actionMetadata, Option.of(triggeringInstantTimestamp)));
     } else {
       return Option.empty();
     }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 23537f6f798..37222c8266a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -49,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
+
 public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
 
   private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
@@ -73,10 +76,20 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
                                                                                 HoodieEngineContext context,
                                                                                 Option<T> actionMetadata,
                                                                                 Option<String> inflightInstantTimestamp) {
-    return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata,
-                                                    inflightInstantTimestamp);
+    return new SparkHoodieBackedTableMetadataWriter(
+        conf, writeConfig, EAGER, context, actionMetadata, inflightInstantTimestamp);
   }
 
+  public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
+                                                                                HoodieWriteConfig writeConfig,
+                                                                                HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                                                HoodieEngineContext context,
+                                                                                Option<T> actionMetadata,
+                                                                                Option<String> inflightInstantTimestamp) {
+    return new SparkHoodieBackedTableMetadataWriter(
+        conf, writeConfig, failedWritesCleaningPolicy, context, actionMetadata, inflightInstantTimestamp);
+  }
+  
   public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
                                                  HoodieEngineContext context) {
     return create(conf, writeConfig, context, Option.empty(), Option.empty());
@@ -84,10 +97,11 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
 
   <T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
                                                                       HoodieWriteConfig writeConfig,
+                                                                      HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
                                                                       HoodieEngineContext engineContext,
                                                                       Option<T> actionMetadata,
                                                                       Option<String> inflightInstantTimestamp) {
-    super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp);
+    super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, actionMetadata, inflightInstantTimestamp);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 112deccf8df..d6796a7a4d4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -89,14 +90,17 @@ public abstract class HoodieSparkTable<T>
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   @Override
-  public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
-                                                                                            Option<R> actionMetadata) {
+  protected <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(
+      String triggeringInstantTimestamp,
+      HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+      Option<R> actionMetadata) {
     if (config.isMetadataTableEnabled()) {
       // Create the metadata table writer. First time after the upgrade this creation might trigger
       // metadata table bootstrapping. Bootstrapping process could fail and checking the table
       // existence after the creation is needed.
       final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
-          context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp));
+          context.getHadoopConf().get(), config, failedWritesCleaningPolicy, context,
+          actionMetadata, Option.of(triggeringInstantTimestamp));
       // even with metadata enabled, some index could have been disabled
       // delete metadata partitions corresponding to such indexes
       deleteMetadataIndexIfNecessary();
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 1ccc14176a1..f3538127955 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -53,9 +53,9 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.util.Transient;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
+import org.apache.hudi.util.Transient;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -83,6 +83,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BL
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
 /**
  * Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -482,6 +483,15 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream()
         .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
 
+    // We should also add completed indexing delta commits in the metadata table, as they do not
+    // have corresponding completed instant in the data table
+    validInstantTimestamps.addAll(
+        metadataMetaClient.getActiveTimeline()
+            .filter(instant -> instant.isCompleted() && isIndexingCommit(instant.getTimestamp()))
+            .getInstants().stream()
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toList()));
+
     // For any rollbacks and restores, we cannot neglect the instants that they are rolling back.
     // The rollback instant should be more recent than the start of the timeline for it to have rolled back any
     // instant which we have a log block for.
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index d37dbab3d82..81ba4f2a66b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -94,6 +94,7 @@ import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
 import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
@@ -111,6 +112,11 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
   public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
 
+  // This suffix used by the delta commits from async indexer (`HoodieIndexer`),
+  // when the `indexUptoInstantTime` already exists in the metadata table,
+  // to avoid collision.
+  public static final String METADATA_INDEXER_TIME_SUFFIX = "004";
+
   /**
    * Returns whether the files partition of metadata table is ready for read.
    *
@@ -1380,4 +1386,18 @@ public class HoodieTableMetadataUtil {
     inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
     return inflightAndCompletedPartitions;
   }
+
+  /**
+   * Checks if a delta commit in metadata table is written by async indexer.
+   * <p>
+   * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the
+   *  metadata table is landed.
+   *
+   * @param instantTime Instant time to check.
+   * @return {@code true} if from async indexer; {@code false} otherwise.
+   */
+  public static boolean isIndexingCommit(String instantTime) {
+    return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length()
+        && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index ac7b86f4cfa..67504cb957d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -21,18 +21,25 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -56,6 +63,9 @@ import java.util.stream.Stream;
 
 import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
 import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS;
+import static org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -123,7 +133,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
-    initializeWriteClient(metadataConfigBuilder.build(), tableName);
+    upsertToTable(metadataConfigBuilder.build(), tableName);
 
     // validate table config
     assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -138,7 +148,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
-    initializeWriteClient(metadataConfigBuilder.build(), tableName);
+    upsertToTable(metadataConfigBuilder.build(), tableName);
 
     // validate table config
     assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -147,6 +157,78 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName);
   }
 
+  @Test
+  public void testIndexerWithWriter() throws IOException {
+    // Test the case where the indexer is running, i.e., the delta commit in the metadata table
+    // is inflight, while the regular writer is updating metadata table.
+    // The delta commit from the indexer should not be rolled back.
+    String tableName = "indexer_with_writer";
+    // Enable files and bloom_filters on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder =
+        getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
+    HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build();
+    upsertToTable(metadataConfig, tableName);
+
+    // Validate table config
+    assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
+    assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
+
+    // Run async indexer, creating a new indexing instant in the data table and a new delta commit
+    // in the metadata table, with the suffix "004"
+    scheduleAndExecuteIndexing(COLUMN_STATS, tableName);
+
+    HoodieInstant indexingInstant = metaClient.getActiveTimeline()
+        .filter(i -> HoodieTimeline.INDEXING_ACTION.equals(i.getAction()))
+        .getInstants().get(0);
+    HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+        metaClient.getActiveTimeline().readIndexPlanAsBytes(indexingInstant).get());
+    String indexUptoInstantTime = indexPlan.getIndexPartitionInfos().get(0).getIndexUptoInstant();
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(
+        context(), metadataConfig, metaClient.getBasePathV2().toString(),
+        getWriteConfigBuilder(basePath(), tableName).build().getSpillableMapBasePath());
+    HoodieTableMetaClient metadataMetaClient = metadata.getMetadataMetaClient();
+    String mdtCommitTime = indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX;
+    assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime));
+
+    // Reverts both instants to inflight state, to simulate inflight indexing instants
+    metaClient.getActiveTimeline().revertToInflight(indexingInstant);
+    metaClient = reload(metaClient);
+
+    HoodieInstant mdtIndexingCommit = metadataMetaClient.getActiveTimeline()
+        .filter(i -> i.getTimestamp().equals(mdtCommitTime))
+        .getInstants().get(0);
+    metadataMetaClient.getActiveTimeline().revertToInflight(mdtIndexingCommit);
+    metadataMetaClient = reload(metadataMetaClient);
+    // Simulate heartbeats for ongoing write from async indexer in the metadata table
+    HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+        metadataMetaClient.getFs(), metadataMetaClient.getBasePathV2().toString(),
+        CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue().longValue(),
+        CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
+    heartbeatClient.start(mdtCommitTime);
+
+    upsertToTable(metadataConfig, tableName);
+    metaClient = reload(metaClient);
+    metadataMetaClient = reload(metadataMetaClient);
+    // The delta commit from async indexer in metadata table should not be rolled back
+    assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtIndexingCommit.getTimestamp()));
+    assertTrue(metadataMetaClient.getActiveTimeline().getRollbackTimeline().empty());
+
+    // Simulate heartbeat timeout
+    heartbeatClient.stop(mdtCommitTime);
+    upsertToTable(metadataConfig, tableName);
+    metaClient = reload(metaClient);
+    metadataMetaClient = reload(metadataMetaClient);
+    // The delta commit from async indexer in metadata table should be rolled back now
+    assertFalse(metadataMetaClient.getActiveTimeline().containsInstant(mdtIndexingCommit.getTimestamp()));
+    assertEquals(1, metadataMetaClient.getActiveTimeline().getRollbackTimeline().countInstants());
+    HoodieInstant rollbackInstant = metadataMetaClient.getActiveTimeline()
+        .getRollbackTimeline().firstInstant().get();
+    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+        metadataMetaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get());
+    assertEquals(mdtCommitTime, rollbackMetadata.getInstantsRollback()
+        .stream().findFirst().get().getCommitTime());
+  }
+
   private static Stream<Arguments> colStatsFileGroupCountParams() {
     return Stream.of(
         Arguments.of(1),
@@ -163,7 +245,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
-    initializeWriteClient(metadataConfigBuilder.build(), tableName);
+    upsertToTable(metadataConfigBuilder.build(), tableName);
 
     // validate table config
     assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -190,7 +272,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false);
-    initializeWriteClient(metadataConfigBuilder.build(), tableName);
+    upsertToTable(metadataConfigBuilder.build(), tableName);
     // validate table config
     assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
 
@@ -227,12 +309,12 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue());
   }
 
-  private void initializeWriteClient(HoodieMetadataConfig metadataConfig, String tableName) {
+  private void upsertToTable(HoodieMetadataConfig metadataConfig, String tableName) {
     HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName);
     HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build();
     // do one upsert with synchronous metadata update
     SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig);
-    String instant = "0001";
+    String instant = HoodieActiveTimeline.createNewInstantTime();
     writeClient.startCommitWithTime(instant);
     List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100);
     JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant);
@@ -240,8 +322,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     assertNoWriteErrors(statuses);
   }
 
-  private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, List<MetadataPartitionType> nonExistantPartitions,
-                                                String tableName) {
+  private void scheduleAndExecuteIndexing(MetadataPartitionType partitionTypeToIndex, String tableName) {
     HoodieIndexer.Config config = new HoodieIndexer.Config();
     String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
     config.basePath = basePath();
@@ -258,6 +339,13 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
 
     // validate table config
     metaClient = reload(metaClient);
+  }
+
+  private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, List<MetadataPartitionType> nonExistantPartitions,
+                                                String tableName) {
+    scheduleAndExecuteIndexing(partitionTypeToIndex, tableName);
+
+    // validate table config
     Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions();
     assertTrue(completedPartitions.contains(partitionTypeToIndex.getPartitionPath()));
     alreadyCompletedPartitions.forEach(entry -> assertTrue(completedPartitions.contains(entry.getPartitionPath())));
@@ -277,7 +365,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
     // do one upsert with synchronous metadata update
     SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig);
-    String instant = "0001";
+    String instant = HoodieActiveTimeline.createNewInstantTime();
     writeClient.startCommitWithTime(instant);
     List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100);
     JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant);
@@ -331,7 +419,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen
     HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
     // do one upsert with synchronous metadata update
     SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig);
-    String instant = "0001";
+    String instant = HoodieActiveTimeline.createNewInstantTime();
     writeClient.startCommitWithTime(instant);
     List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100);
     JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant);