You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/01/17 15:24:25 UTC

[hudi] branch master updated: [HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9bc03ed868 [HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)
c9bc03ed868 is described below

commit c9bc03ed8681ab64eea2520f5511464915389c51
Author: Zhaojing Yu <yu...@bytedance.com>
AuthorDate: Tue Jan 17 23:24:16 2023 +0800

    [HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)
    
    Also refactor `Hoodie*Client` classes to separate table service APIs from write ones.
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |  68 +-
 .../hudi/client/BaseHoodieTableServiceClient.java  | 832 +++++++++++++++++++++
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 554 ++------------
 .../client/HoodieTableServiceManagerClient.java    | 169 +++++
 .../org/apache/hudi/client/RunsTableService.java   |   5 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  20 +-
 .../action/clean/CleanPlanActionExecutor.java      |  12 +-
 .../cluster/ClusteringPlanActionExecutor.java      |   1 +
 .../compact/ScheduleCompactionActionExecutor.java  |  14 +-
 .../BaseHoodieCompactionPlanGenerator.java         |   3 +
 .../hudi/client/HoodieFlinkTableServiceClient.java | 214 ++++++
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 138 +---
 .../hudi/client/HoodieJavaTableServiceClient.java  |  65 ++
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   6 +-
 .../hudi/client/HoodieSparkClusteringClient.java   |   2 +-
 .../hudi/client/SparkRDDTableServiceClient.java    | 295 ++++++++
 .../apache/hudi/client/SparkRDDWriteClient.java    | 211 +-----
 .../org/apache/hudi/client/TestClientRollback.java |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   6 +-
 .../config/HoodieTableServiceManagerConfig.java    | 172 +++++
 .../sink/clustering/HoodieFlinkClusteringJob.java  |   2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |   4 +-
 .../functional/SparkRDDWriteClientOverride.java    |  38 -
 .../org/apache/hudi/functional/TestBootstrap.java  |  13 +-
 .../apache/hudi/functional/TestOrcBootstrap.java   |  20 +-
 .../apache/hudi/utilities/HoodieClusteringJob.java |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 27 files changed, 1957 insertions(+), 915 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index b41747d83a8..d076184e24a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -20,14 +20,24 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.TransactionUtils;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
 
+import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
@@ -35,6 +45,8 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
@@ -47,9 +59,11 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
   protected final transient FileSystem fs;
   protected final transient HoodieEngineContext context;
   protected final transient Configuration hadoopConf;
+  protected final transient HoodieMetrics metrics;
   protected final HoodieWriteConfig config;
   protected final String basePath;
   protected final HoodieHeartbeatClient heartbeatClient;
+  protected final TransactionManager txnManager;
 
   /**
    * Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
@@ -74,6 +88,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
     shouldStopTimelineServer = !timelineServer.isPresent();
     this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
         clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses());
+    this.metrics = new HoodieMetrics(config);
+    this.txnManager = new TransactionManager(config, fs);
     startEmbeddedServerView();
     initWrapperFSMetrics();
   }
@@ -85,6 +101,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
   public void close() {
     stopEmbeddedServerView(true);
     this.context.setJobStatus("", "");
+    this.heartbeatClient.stop();
+    this.txnManager.close();
   }
 
   private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
@@ -146,4 +164,52 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
   public HoodieHeartbeatClient getHeartbeatClient() {
     return heartbeatClient;
   }
+
+  /**
+   * Resolve write conflicts before commit.
+   *
+   * @param table A hoodie table instance created after transaction starts so that the latest commits and files are captured.
+   * @param metadata Current committing instant's metadata
+   * @param pendingInflightAndRequestedInstants
+   * @see {@link BaseHoodieWriteClient#preCommit}
+   * @see {@link BaseHoodieTableServiceClient#preCommit}
+   */
+  protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata metadata, Set<String> pendingInflightAndRequestedInstants) {
+    Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
+    try {
+      TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
+          Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants);
+      metrics.emitConflictResolutionSuccessful();
+    } catch (HoodieWriteConflictException e) {
+      metrics.emitConflictResolutionFailed();
+      throw e;
+    } finally {
+      if (conflictResolutionTimer != null) {
+        conflictResolutionTimer.stop();
+      }
+    }
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table HoodieTable
+   * @param instantTime Instant Time
+   * @param stats Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(context, instantTime, stats);
+      if (finalizeCtx != null) {
+        Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+    }
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
new file mode 100644
index 00000000000..7ea70f63998
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected Set<String> pendingInflightAndRequestedInstants;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  protected void setPendingInflightAndRequestedInstants(Set<String> pendingInflightAndRequestedInstants) {
+    this.pendingInflightAndRequestedInstants = pendingInflightAndRequestedInstants;
+  }
+
+  /**
+   * Any pre-commit actions like conflict resolution goes here.
+   * @param metadata commit metadata for which pre commit is being invoked.
+   */
+  protected void preCommit(HoodieCommitMetadata metadata) {
+    // To be overridden by specific engines to perform conflict resolution if any.
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Write the HoodieCommitMetadata to metadata table if available.
+   *
+   * @param table       {@link HoodieTable} of interest.
+   * @param instantTime instant time of the commit.
+   * @param actionType  action type of the commit.
+   * @param metadata    instance of {@link HoodieCommitMetadata}.
+   */
+  protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+    checkArgument(table.isTableServiceAction(actionType, instantTime), String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime));
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+    table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, true));
+  }
+
+  /**
+   * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
+   * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
+   * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
+   * {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
+   * of clean.
+   *
+   * @param cleanInstantTime instant time for clean.
+   * @param scheduleInline   true if needs to be scheduled inline. false otherwise.
+   * @param skipLocking      if this is triggered by another parent transaction, locking can be skipped.
+   */
+  @Nullable
+  public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
+    if (!tableServicesEnabled(config)) {
+      return null;
+    }
+    final Timer.Context timerContext = metrics.getCleanCtx();
+    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+
+    HoodieTable table = createTable(config, hadoopConf);
+    if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
+      LOG.info("Cleaner started");
+      // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
+      if (scheduleInline) {
+        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
+        table.getMetaClient().reloadActiveTimeline();
+      }
+
+      if (shouldDelegateToTableServiceManager(config, ActionType.clean)) {
+        LOG.warn("Cleaning is not yet supported with Table Service Manager.");
+        return null;
+      }
+    }
+
+    // Proceeds to execute any requested or inflight clean instances in the timeline
+    HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
+    if (timerContext != null && metadata != null) {
+      long durationMs = metrics.getDurationInMs(timerContext.stop());
+      metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
+      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+          + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+          + " cleanerElapsedMs" + durationMs);
+    }
+    return metadata;
+  }
+
+  /**
+   * Trigger archival for the table. This ensures that the number of commits do not explode
+   * and keep increasing unbounded over time.
+   * @param table table to commit on.
+   * @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
+   */
+  protected void archive(HoodieTable table, boolean acquireLockForArchival) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+    try {
+      // We cannot have unbounded commit files. Archive commits if we have to archive
+      HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
+      archiver.archiveIfRequired(context, acquireLockForArchival);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to archive", ioe);
+    }
+  }
+
+  /**
+   * Get inflight time line exclude compaction and clustering.
+   * @param metaClient
+   * @return
+   */
+  private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
+    HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
+    HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
+      if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+        Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
+        return !instantPlan.isPresent();
+      } else {
+        return true;
+      }
+    });
+    return inflightTimelineExcludeClusteringCommit;
+  }
+
+  protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
+    return getPendingRollbackInfo(metaClient, commitToRollback, true);
+  }
+
+  public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
+    return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
+  }
+
+  protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
+    return getPendingRollbackInfos(metaClient, true);
+  }
+
+  /**
+   * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
+   * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+   * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
+   */
+  protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
+    List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
+    Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
+    for (HoodieInstant rollbackInstant : instants) {
+      HoodieRollbackPlan rollbackPlan;
+      try {
+        rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
+      } catch (Exception e) {
+        if (rollbackInstant.isRequested()) {
+          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
+          try {
+            metaClient.getActiveTimeline().deletePending(rollbackInstant);
+          } catch (HoodieIOException he) {
+            LOG.warn("Cannot delete " + rollbackInstant, he);
+            continue;
+          }
+        } else {
+          // Here we assume that if the rollback is inflight, the rollback plan is intact
+          // in instant.rollback.requested.  The exception here can be due to other reasons.
+          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
+        }
+        continue;
+      }
+
+      try {
+        String action = rollbackPlan.getInstantToRollback().getAction();
+        if (ignoreCompactionAndClusteringInstants) {
+          if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+            boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
+                && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
+                rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
+            if (!isClustering) {
+              String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
+              infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+            }
+          }
+        } else {
+          infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+        }
+      } catch (Exception e) {
+        LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
+      }
+    }
+    return infoMap;
+  }
+
+  /**
+   * Rollback all failed writes.
+   */
+  protected Boolean rollbackFailedWrites() {
+    return rollbackFailedWrites(false);
+  }
+
+  /**
+   * Rollback all failed writes.
+   * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+   */
+  protected Boolean rollbackFailedWrites(boolean skipLocking) {
+    HoodieTable table = createTable(config, hadoopConf);
+    List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
+    Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
+    instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+    rollbackFailedWrites(pendingRollbacks, skipLocking);
+    return true;
+  }
+
+  protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
+    // sort in reverse order of commit times
+    LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
+        .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
+    for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) {
+      if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
+          HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+        // do we need to handle failed rollback of a bootstrap
+        rollbackFailedBootstrap();
+        HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
+        break;
+      } else {
+        rollback(entry.getKey(), entry.getValue(), skipLocking);
+        HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
+      }
+    }
+  }
+
+  protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
+    Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
+        .getReverseOrderedInstants();
+    if (cleaningPolicy.isEager()) {
+      return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
+        if (curInstantTime.isPresent()) {
+          return !entry.equals(curInstantTime.get());
+        } else {
+          return true;
+        }
+      }).collect(Collectors.toList());
+    } else if (cleaningPolicy.isLazy()) {
+      return inflightInstantsStream.filter(instant -> {
+        try {
+          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+        } catch (IOException io) {
+          throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+        }
+      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    } else if (cleaningPolicy.isNever()) {
+      return Collections.emptyList();
+    } else {
+      throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
+    }
+  }
+
+  /**
+   * @Deprecated
+   * Rollback the inflight record changes with the given commit time. This
+   * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
+   *
+   * @param commitInstantTime Instant time of the commit
+   * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
+   * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+   * @throws HoodieRollbackException if rollback cannot be performed successfully
+   */
+  @Deprecated
+  public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
+    LOG.info("Begin rollback of instant " + commitInstantTime);
+    final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
+    final Timer.Context timerContext = this.metrics.getRollbackCtx();
+    try {
+      HoodieTable table = createTable(config, hadoopConf);
+      Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+          .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+          .findFirst());
+      if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
+        LOG.info(String.format("Scheduling Rollback at instant time : %s "
+                + "(exists in active timeline: %s), with rollback plan: %s",
+            rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
+        Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
+            .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
+        if (rollbackPlanOption.isPresent()) {
+          // There can be a case where the inflight rollback failed after the instant files
+          // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
+          // not present in the timeline.  In such a case, the hoodie instant instance
+          // is reconstructed to allow the rollback to be reattempted, and the deleteInstants
+          // is set to false since they are already deleted.
+          // Execute rollback
+          HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+              ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
+              : table.rollback(context, rollbackInstantTime, new HoodieInstant(
+                  true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+              false, skipLocking);
+          if (timerContext != null) {
+            long durationInMs = metrics.getDurationInMs(timerContext.stop());
+            metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
+          }
+          return true;
+        } else {
+          throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
+        }
+      } else {
+        LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
+        return false;
+      }
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
+    }
+  }
+
+  /**
+   * Main API to rollback failed bootstrap.
+   */
+  public void rollbackFailedBootstrap() {
+    LOG.info("Rolling back pending bootstrap if present");
+    HoodieTable table = createTable(config, hadoopConf);
+    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
+    Option<String> instant = Option.fromJavaOptional(
+        inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
+    if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
+        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+      LOG.info("Found pending bootstrap instants. Rolling them back");
+      table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
+      LOG.info("Finished rolling back pending bootstrap");
+    }
+  }
+
+  private Option<String> delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) {
+    if (!config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) {
+      return Option.empty();
+    }
+    HoodieTableServiceManagerClient tableServiceManagerClient = new HoodieTableServiceManagerClient(table.getMetaClient(), config.getTableServiceManagerConfig());
+    switch (tableServiceType) {
+      case COMPACT:
+        return tableServiceManagerClient.executeCompaction();
+      case CLUSTER:
+        return tableServiceManagerClient.executeClustering();
+      case CLEAN:
+        return tableServiceManagerClient.executeClean();
+      default:
+        LOG.info("Not supported delegate to table service manager, tableServiceType : " + tableServiceType.getAction());
+        return Option.empty();
+    }
+  }
+
+  @Override
+  public void close() {
+    AsyncArchiveService.forceShutdown(asyncArchiveService);
+    asyncArchiveService = null;
+    AsyncCleanerService.forceShutdown(asyncCleanerService);
+    asyncCleanerService = null;
+    // Stop timeline-server if running
+    super.close();
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 04b70d5545c..b361f8918c4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,29 +18,23 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.async.AsyncArchiveService;
-import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.callback.HoodieWriteCommitCallback;
 import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
-import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
@@ -55,15 +49,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CleanerUtils;
-import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieArchivalConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
@@ -90,7 +81,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.restore.RestoreUtils;
-import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
@@ -108,13 +98,10 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
@@ -128,8 +115,7 @@ import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
  * @param <K> Type of keys
  * @param <O> Type of outputs
  */
-public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
-    implements RunsTableService {
+public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient implements RunsTableService {
 
   protected static final String LOOKUP_STR = "lookup";
   private static final long serialVersionUID = 1L;
@@ -140,18 +126,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
   private transient WriteOperationType operationType;
   private transient HoodieWriteCommitCallback commitCallback;
 
-  protected final transient HoodieMetrics metrics;
   protected transient Timer.Context writeTimer = null;
-  protected transient Timer.Context compactionTimer;
-  protected transient Timer.Context clusteringTimer;
-  protected transient Timer.Context logCompactionTimer;
 
-  protected transient AsyncCleanerService asyncCleanerService;
-  protected transient AsyncArchiveService asyncArchiveService;
-  protected final TransactionManager txnManager;
   protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty();
   protected Set<String> pendingInflightAndRequestedInstants;
 
+  protected BaseHoodieTableServiceClient<O> tableServiceClient;
+
   /**
    * Create a write client, with new hudi index.
    * @param context HoodieEngineContext
@@ -178,9 +159,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
                                Option<EmbeddedTimelineService> timelineService,
                                SupportsUpgradeDowngrade upgradeDowngradeHelper) {
     super(context, writeConfig, timelineService);
-    this.metrics = new HoodieMetrics(config);
     this.index = createIndex(writeConfig);
-    this.txnManager = new TransactionManager(config, fs);
     this.upgradeDowngradeHelper = upgradeDowngradeHelper;
   }
 
@@ -194,6 +173,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     return this.operationType;
   }
 
+  public BaseHoodieTableServiceClient<O> getTableServiceClient() {
+    return tableServiceClient;
+  }
+
   /**
    * Commit changes performed at the given instantTime marker.
    */
@@ -363,9 +346,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * @param metadata instance of {@link HoodieCommitMetadata}.
    */
   protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
-    context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
-    table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
-        table.isTableServiceAction(actionType, instantTime)));
+    if (table.isTableServiceAction(actionType, instantTime)) {
+      tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
+    } else {
+      context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+      table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, false));
+    }
   }
 
   /**
@@ -385,27 +371,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
       throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
     }
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
-    rollbackFailedBootstrap();
+    tableServiceClient.rollbackFailedBootstrap();
     table.bootstrap(context, extraMetadata);
   }
 
-  /**
-   * Main API to rollback failed bootstrap.
-   */
-  protected void rollbackFailedBootstrap() {
-    LOG.info("Rolling back pending bootstrap if present");
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
-    Option<String> instant = Option.fromJavaOptional(
-        inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
-    if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
-        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
-      LOG.info("Found pending bootstrap instants. Rolling them back");
-      table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
-      LOG.info("Finished rolling back pending bootstrap");
-    }
-  }
-
   /**
    * Upsert a batch of new records into Hoodie table at the supplied instantTime.
    *
@@ -524,16 +493,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
         ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
     this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
     this.pendingInflightAndRequestedInstants.remove(instantTime);
-    if (null == this.asyncCleanerService) {
-      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
-    } else {
-      this.asyncCleanerService.start(null);
-    }
-    if (null == this.asyncArchiveService) {
-      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(this);
-    } else {
-      this.asyncArchiveService.start(null);
-    }
+    tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
+    tableServiceClient.startAsyncCleanerService(this);
+    tableServiceClient.startAsyncArchiveService(this);
   }
 
   /**
@@ -568,82 +530,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
   }
 
   protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
-    if (!tableServicesEnabled(config)) {
-      return;
-    }
-    if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) {
-      if (config.isMetadataTableEnabled()) {
-        table.getHoodieView().sync();
-      }
-      // Do an inline compaction if enabled
-      if (config.inlineCompactionEnabled()) {
-        runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
-        inlineCompaction(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
-      }
-
-      // if just inline schedule is enabled
-      if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
-          && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
-        // proceed only if there are no pending compactions
-        metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
-        inlineScheduleCompaction(extraMetadata);
-      }
-
-      // Do an inline log compaction if enabled
-      if (config.inlineLogCompactionEnabled()) {
-        runAnyPendingLogCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
-        inlineLogCompact(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
-      }
-
-      // Do an inline clustering if enabled
-      if (config.inlineClusteringEnabled()) {
-        runAnyPendingClustering(table);
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
-        inlineClustering(extraMetadata);
-      } else {
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
-      }
-
-      // if just inline schedule is enabled
-      if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
-          && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
-        // proceed only if there are no pending clustering
-        metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
-        inlineScheduleClustering(extraMetadata);
-      }
-    }
-  }
-
-  protected void runAnyPendingCompactions(HoodieTable table) {
-    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
-        .forEach(instant -> {
-          LOG.info("Running previously failed inflight compaction at instant " + instant);
-          compact(instant.getTimestamp(), true);
-        });
-  }
-
-  protected void runAnyPendingLogCompactions(HoodieTable table) {
-    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
-        .forEach(instant -> {
-          LOG.info("Running previously failed inflight log compaction at instant " + instant);
-          logCompact(instant.getTimestamp(), true);
-        });
-  }
-
-  protected void runAnyPendingClustering(HoodieTable table) {
-    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
-      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
-      if (instantPlan.isPresent()) {
-        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
-        cluster(instant.getTimestamp(), true);
-      }
-    });
+    tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
   }
 
   protected void autoCleanOnCommit() {
@@ -653,7 +540,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
 
     if (config.isAsyncClean()) {
       LOG.info("Async cleaner has been spawned. Waiting for it to finish");
-      AsyncCleanerService.waitForCompletion(asyncCleanerService);
+      tableServiceClient.asyncClean();
       LOG.info("Async cleaner has finished");
     } else {
       LOG.info("Start to clean synchronously.");
@@ -669,7 +556,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
 
     if (config.isAsyncArchive()) {
       LOG.info("Async archiver has been spawned. Waiting for it to finish");
-      AsyncArchiveService.waitForCompletion(asyncArchiveService);
+      tableServiceClient.asyncArchive();
       LOG.info("Async archiver has finished");
     } else {
       LOG.info("Start to archive synchronously.");
@@ -681,7 +568,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * Run any pending compactions.
    */
   public void runAnyPendingCompactions() {
-    runAnyPendingCompactions(createTable(config, hadoopConf));
+    tableServiceClient.runAnyPendingCompactions(createTable(config, hadoopConf));
   }
 
   /**
@@ -774,63 +661,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
   @Deprecated
   public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
-    Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
-    return rollback(commitInstantTime, pendingRollbackInfo, false);
-  }
-
-  /**
-   * @Deprecated
-   * Rollback the inflight record changes with the given commit time. This
-   * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
-   *
-   * @param commitInstantTime Instant time of the commit
-   * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
-   * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
-   * @throws HoodieRollbackException if rollback cannot be performed successfully
-   */
-  @Deprecated
-  public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
-    LOG.info("Begin rollback of instant " + commitInstantTime);
-    final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-    final Timer.Context timerContext = this.metrics.getRollbackCtx();
-    try {
-      HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-      Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
-          .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
-          .findFirst());
-      if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
-        LOG.info(String.format("Scheduling Rollback at instant time : %s "
-                + "(exists in active timeline: %s), with rollback plan: %s",
-            rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
-        Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
-            .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
-        if (rollbackPlanOption.isPresent()) {
-          // There can be a case where the inflight rollback failed after the instant files
-          // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
-          // not present in the timeline.  In such a case, the hoodie instant instance
-          // is reconstructed to allow the rollback to be reattempted, and the deleteInstants
-          // is set to false since they are already deleted.
-          // Execute rollback
-          HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
-              ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
-              : table.rollback(context, rollbackInstantTime, new HoodieInstant(
-                  true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
-              false, skipLocking);
-          if (timerContext != null) {
-            long durationInMs = metrics.getDurationInMs(timerContext.stop());
-            metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
-          }
-          return true;
-        } else {
-          throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
-        }
-      } else {
-        LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
-        return false;
-      }
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
-    }
+    Option<HoodiePendingRollbackInfo> pendingRollbackInfo = tableServiceClient.getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+    return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, false);
   }
 
   /**
@@ -912,33 +744,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
    */
   public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
-    if (!tableServicesEnabled(config)) {
-      return null;
-    }
-    final Timer.Context timerContext = metrics.getCleanCtx();
-    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
-
-    HoodieTable table = createTable(config, hadoopConf);
-    if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
-      LOG.info("Cleaner started");
-      // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
-      if (scheduleInline) {
-        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
-        table.getMetaClient().reloadActiveTimeline();
-      }
-    }
-
-    // Proceeds to execute any requested or inflight clean instances in the timeline
-    HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
-    if (timerContext != null && metadata != null) {
-      long durationMs = metrics.getDurationInMs(timerContext.stop());
-      metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
-      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
-          + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
-          + " cleanerElapsedMs" + durationMs);
-    }
-    return metadata;
+    return tableServiceClient.clean(cleanInstantTime, scheduleInline, skipLocking);
   }
 
   public HoodieCleanMetadata clean() {
@@ -962,16 +768,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
    */
   protected void archive(HoodieTable table, boolean acquireLockForArchival) {
-    if (!tableServicesEnabled(config)) {
-      return;
-    }
-    try {
-      // We cannot have unbounded commit files. Archive commits if we have to archive
-      HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
-      archiver.archiveIfRequired(context, acquireLockForArchival);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Failed to archive", ioe);
-    }
+    tableServiceClient.archive(table, acquireLockForArchival);
   }
 
   /**
@@ -997,7 +794,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    */
   public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
+        HoodieTimeline.COMMIT_ACTION, () -> tableServiceClient.rollbackFailedWrites());
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
     startCommit(instantTime, actionType, metaClient);
     return instantTime;
@@ -1025,7 +822,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    */
   private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
+        HoodieTimeline.COMMIT_ACTION, () -> tableServiceClient.rollbackFailedWrites());
     startCommit(instantTime, actionType, metaClient);
   }
 
@@ -1114,6 +911,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     }
   }
 
+  /**
+   * Performs Clustering for the workload stored in instant-time.
+   *
+   * @param clusteringInstantTime Clustering Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> cluster(String clusteringInstantTime) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      throw new UnsupportedOperationException("Clustering should be delegated to table service manager instead of direct run.");
+    }
+    return cluster(clusteringInstantTime, true);
+  }
+
   /**
    * Performs Compaction for the workload stored in instant-time.
    *
@@ -1121,6 +931,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * @return Collection of WriteStatus to inspect errors and counts
    */
   public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      throw new UnsupportedOperationException("Compaction should be delegated to table service manager instead of direct run.");
+    }
     return compact(compactionInstantTime, config.shouldAutoCommit());
   }
 
@@ -1186,152 +999,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     throw new UnsupportedOperationException("Log compaction is not supported yet.");
   }
 
-  /**
-   * Get inflight time line exclude compaction and clustering.
-   * @param metaClient
-   * @return
-   */
-  private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
-    HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
-    HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
-      if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-        Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
-        return !instantPlan.isPresent();
-      } else {
-        return true;
-      }
-    });
-    return inflightTimelineExcludeClusteringCommit;
-  }
-
-  protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
-    return getPendingRollbackInfo(metaClient, commitToRollback, true);
-  }
-
-  public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
-    return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
-  }
-
-  protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
-    return getPendingRollbackInfos(metaClient, true);
-  }
-
-  /**
-   * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
-   * @param metaClient instance of {@link HoodieTableMetaClient} to use.
-   * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
-   */
-  protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
-    List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
-    Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
-    for (HoodieInstant rollbackInstant : instants) {
-      HoodieRollbackPlan rollbackPlan;
-      try {
-        rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
-      } catch (Exception e) {
-        if (rollbackInstant.isRequested()) {
-          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
-          try {
-            metaClient.getActiveTimeline().deletePending(rollbackInstant);
-          } catch (HoodieIOException he) {
-            LOG.warn("Cannot delete " + rollbackInstant, he);
-            continue;
-          }
-        } else {
-          // Here we assume that if the rollback is inflight, the rollback plan is intact
-          // in instant.rollback.requested.  The exception here can be due to other reasons.
-          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
-        }
-        continue;
-      }
-
-      try {
-        String action = rollbackPlan.getInstantToRollback().getAction();
-        if (ignoreCompactionAndClusteringInstants) {
-          if (!HoodieTimeline.COMPACTION_ACTION.equals(action) && !HoodieTimeline.LOG_COMPACTION_ACTION.equals(action)) {
-            boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
-                && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
-                rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
-            if (!isClustering) {
-              String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
-              infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
-            }
-          }
-        } else {
-          infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
-        }
-      } catch (Exception e) {
-        LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
-      }
-    }
-    return infoMap;
-  }
-
-  /**
-   * Rollback all failed writes.
-   */
-  protected Boolean rollbackFailedWrites() {
-    return rollbackFailedWrites(false);
-  }
-
-  /**
-   * Rollback all failed writes.
-   * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
-   */
-  protected Boolean rollbackFailedWrites(boolean skipLocking) {
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
-    Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
-    instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
-    rollbackFailedWrites(pendingRollbacks, skipLocking);
-    return true;
-  }
-
-  protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
-    // sort in reverse order of commit times
-    LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
-        .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
-    for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) {
-      if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
-          HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
-        // do we need to handle failed rollback of a bootstrap
-        rollbackFailedBootstrap();
-        HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
-        break;
-      } else {
-        rollback(entry.getKey(), entry.getValue(), skipLocking);
-        HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
-      }
-    }
-  }
-
-  protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
-    Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
-        .getReverseOrderedInstants();
-    if (cleaningPolicy.isEager()) {
-      return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
-        if (curInstantTime.isPresent()) {
-          return !entry.equals(curInstantTime.get());
-        } else {
-          return true;
-        }
-      }).collect(Collectors.toList());
-    } else if (cleaningPolicy.isLazy()) {
-      return inflightInstantsStream.filter(instant -> {
-        try {
-          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-        } catch (IOException io) {
-          throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
-        }
-      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
-    } else if (cleaningPolicy.isNever()) {
-      return Collections.EMPTY_LIST;
-    } else {
-      throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
-    }
-  }
-
   /**
    * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
    *
@@ -1340,19 +1007,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    */
   protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
 
-  /**
-   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
-   * Scheduling and execution is done inline.
-   */
-  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
-    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
-    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
-      // inline compaction should auto commit as the user is never given control
-      compact(compactInstantTime, true);
-    });
-    return compactionInstantTimeOpt;
-  }
-
   /***
    * Schedules compaction inline.
    * @param extraMetadata extrametada to be used.
@@ -1372,18 +1026,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     throw new UnsupportedOperationException("Log compaction is not supported yet.");
   }
 
-  /**
-   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
-   */
-  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
-    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
-    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
-      // inline log compaction should auto commit as the user is never given control
-      logCompact(logCompactInstantTime, true);
-    });
-    return logCompactionInstantTimeOpt;
-  }
-
   /**
    * Schedules a new clustering instant.
    * @param extraMetadata Extra Metadata to be stored
@@ -1402,15 +1044,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
   }
 
-  /**
-   * Schedules a new cleaning instant.
-   * @param extraMetadata Extra Metadata to be stored
-   */
-  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
-    String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
-  }
-
   /**
    * Schedules a new cleaning instant with passed-in instant time.
    * @param instantTime cleaning Instant Time
@@ -1454,91 +1087,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     try {
       this.txnManager.beginTransaction(inflightInstant, Option.empty());
       LOG.info("Scheduling table service " + tableServiceType);
-      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+      return tableServiceClient.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
     } finally {
       this.txnManager.endTransaction(inflightInstant);
     }
   }
 
-  private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
-                                                      TableServiceType tableServiceType) {
-    if (!tableServicesEnabled(config)) {
-      return Option.empty();
-    }
-    switch (tableServiceType) {
-      case ARCHIVE:
-        LOG.info("Scheduling archiving is not supported. Skipping.");
-        return Option.empty();
-      case CLUSTER:
-        LOG.info("Scheduling clustering at instant time :" + instantTime);
-        Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
-            .scheduleClustering(context, instantTime, extraMetadata);
-        return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
-      case COMPACT:
-        LOG.info("Scheduling compaction at instant time :" + instantTime);
-        Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
-            .scheduleCompaction(context, instantTime, extraMetadata);
-        return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
-      case LOG_COMPACT:
-        LOG.info("Scheduling log compaction at instant time :" + instantTime);
-        Option<HoodieCompactionPlan> logCompactionPlan = createTable(config, hadoopConf)
-            .scheduleLogCompaction(context, instantTime, extraMetadata);
-        return logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
-      case CLEAN:
-        LOG.info("Scheduling cleaning at instant time :" + instantTime);
-        Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
-            .scheduleCleaning(context, instantTime, extraMetadata);
-        return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
-      default:
-        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
-    }
-  }
-
-  /**
-   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
-   * Schedules and executes clustering inline.
-   */
-  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
-    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
-    clusteringInstantOpt.ifPresent(clusteringInstant -> {
-      // inline cluster should auto commit as the user is never given control
-      cluster(clusteringInstant, true);
-    });
-    return clusteringInstantOpt;
-  }
-
-  /**
-   * Schedules clustering inline.
-   * @param extraMetadata extrametadata to use.
-   * @return clustering instant if scheduled.
-   */
-  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
-    return scheduleClustering(extraMetadata);
-  }
-
-  /**
-   * Finalize Write operation.
-   *
-   * @param table HoodieTable
-   * @param instantTime Instant Time
-   * @param stats Hoodie Write Stat
-   */
-  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
-    try {
-      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
-      table.finalizeWrite(context, instantTime, stats);
-      if (finalizeCtx != null) {
-        Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
-        durationInMs.ifPresent(duration -> {
-          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
-          metrics.updateFinalizeWriteMetrics(duration, stats.size());
-        });
-      }
-    } catch (HoodieIOException ioe) {
-      throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
-    }
-  }
-
   public HoodieMetrics getMetrics() {
     return metrics;
   }
@@ -1619,10 +1173,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
         setWriteTimer(table);
         break;
       case CLUSTER:
-        clusteringTimer = metrics.getClusteringCtx();
-        break;
       case COMPACT:
-        compactionTimer = metrics.getCompactionCtx();
+      case LOG_COMPACT:
+        tableServiceClient.setTableServiceTimer(operationType);
         break;
       default:
     }
@@ -1670,17 +1223,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
 
   @Override
   public void close() {
-    AsyncArchiveService.forceShutdown(asyncArchiveService);
-    asyncArchiveService = null;
-    AsyncCleanerService.forceShutdown(asyncCleanerService);
-    asyncCleanerService = null;
     // Stop timeline-server if running
     super.close();
     // Calling this here releases any resources used by your index, so make sure to finish any related operations
     // before this point
     this.index.close();
-    this.heartbeatClient.stop();
-    this.txnManager.close();
+    this.tableServiceClient.close();
   }
 
   private void setWriteTimer(HoodieTable table) {
@@ -1699,13 +1247,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
     if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
-      List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
+      List<String> instantsToRollback = tableServiceClient.getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
 
       if (!instantsToRollback.isEmpty()) {
-        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
+        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = tableServiceClient.getPendingRollbackInfos(metaClient);
         instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
 
-        rollbackFailedWrites(pendingRollbacks, true);
+        tableServiceClient.rollbackFailedWrites(pendingRollbacks, true);
       }
 
       new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java
new file mode 100644
index 00000000000..780942158cf
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Client that sends the table service action instants to the table service manager.
+ */
+public class HoodieTableServiceManagerClient {
+
+  public enum Action {
+    REQUEST,
+    CANCEL,
+    REGISTER
+  }
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean");
+
+  public static final String ACTION = "action";
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  public static final String RETRY_EXCEPTIONS = "IOException";
+
+  private final HoodieTableServiceManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String uri;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableServiceManagerClient.class);
+
+  public HoodieTableServiceManagerClient(HoodieTableMetaClient metaClient, HoodieTableServiceManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.uri = config.getTableServiceManagerURIs();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+    URIBuilder builder = new URIBuilder(URI.create(uri)).setPath(requestPath);
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    int timeoutMs = this.config.getConnectionTimeoutSec() * 1000;
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int connectionRetryDelay = config.getConnectionRetryDelay();
+
+    RetryHelper<String, IOException> retryHelper = new RetryHelper<>(connectionRetryDelay, requestRetryLimit, connectionRetryDelay, RETRY_EXCEPTIONS);
+    return retryHelper.tryWith(() -> Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute().returnContent().asString()).start();
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public Option<String> executeCompaction() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstantsAsStream()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(EXECUTE_COMPACTION, getDefaultParams(Action.REQUEST, instantRange));
+      return Option.of(instantRange);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public Option<String> executeClean() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .getCleanerTimeline()
+          .filterInflightsAndRequested()
+          .getInstantsAsStream()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(EXECUTE_CLEAN, getDefaultParams(Action.REQUEST, instantRange));
+      return Option.of(instantRange);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public Option<String> executeClustering() {
+    try {
+      metaClient.reloadActiveTimeline();
+      String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
+          .stream()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(EXECUTE_CLUSTERING, getDefaultParams(Action.REQUEST, instantRange));
+      return Option.of(instantRange);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  private Map<String, String> getDefaultParams(Action action, String instantRange) {
+    return getParamsWithAdditionalParams(
+        new String[] {ACTION, DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, USERNAME, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS, EXECUTION_ENGINE},
+        new String[] {action.name(), dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(), config.getDeployResources(),
+            String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams(), config.getDeployExecutionEngine()});
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
index 64e540568e8..7de48be975a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.apache.log4j.LogManager;
@@ -34,4 +35,8 @@ public interface RunsTableService {
     }
     return enabled;
   }
+
+  default boolean shouldDelegateToTableServiceManager(HoodieWriteConfig config, ActionType actionType) {
+    return config.getTableServiceManagerConfig().isEnabledAndActionSupported(actionType);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b70b13c0833..879136206e2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieMetaserverConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -558,7 +559,8 @@ public class HoodieWriteConfig extends HoodieConfig {
   private FileSystemViewStorageConfig viewStorageConfig;
   private HoodiePayloadConfig hoodiePayloadConfig;
   private HoodieMetadataConfig metadataConfig;
-  private HoodieMetaserverConfig metaserverConfig;
+  private HoodieMetaserverConfig metastoreConfig;
+  private HoodieTableServiceManagerConfig tableServiceManagerConfig;
   private HoodieCommonConfig commonConfig;
   private HoodieStorageConfig storageConfig;
   private EngineType engineType;
@@ -951,7 +953,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     this.viewStorageConfig = clientSpecifiedViewStorageConfig;
     this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
     this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
-    this.metaserverConfig = HoodieMetaserverConfig.newBuilder().fromProperties(props).build();
+    this.metastoreConfig = HoodieMetaserverConfig.newBuilder().fromProperties(props).build();
+    this.tableServiceManagerConfig = HoodieTableServiceManagerConfig.newBuilder().fromProperties(props).build();
     this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
     this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
   }
@@ -2062,6 +2065,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return metadataConfig;
   }
 
+  public HoodieTableServiceManagerConfig getTableServiceManagerConfig() {
+    return tableServiceManagerConfig;
+  }
+
   public HoodieCommonConfig getCommonConfig() {
     return commonConfig;
   }
@@ -2275,7 +2282,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * Metastore configs.
    */
   public boolean isMetaserverEnabled() {
-    return metaserverConfig.isMetaserverEnabled();
+    return metastoreConfig.isMetaserverEnabled();
   }
 
   /**
@@ -2286,6 +2293,13 @@ public class HoodieWriteConfig extends HoodieConfig {
         getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
   }
 
+  /**
+   * Table Service Manager configs.
+   */
+  public boolean isTableServiceManagerEnabled() {
+    return tableServiceManagerConfig.isTableServiceManagerEnabled();
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 1f6a5a1d790..432c6dc80d3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -45,6 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.MapUtils.nonEmpty;
+
 public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
 
   private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
@@ -145,8 +148,8 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
    */
   protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
     final HoodieCleanerPlan cleanerPlan = requestClean(context);
-    if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
-        && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
+    Option<HoodieCleanerPlan> option = Option.empty();
+    if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
         && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
       // Only create cleaner plan which does some work
       final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
@@ -158,9 +161,10 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    return option;
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 683be09efee..2445043e07c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -104,6 +104,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends BaseActionExecutor
         throw new HoodieIOException("Exception scheduling clustering", ioe);
       }
     }
+
     return planOption;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 328ed7d9221..7444c6eef1b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -36,19 +36,22 @@ import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
-
 import org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator;
 import org.apache.hudi.table.action.compact.plan.generators.HoodieCompactionPlanGenerator;
 import org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
@@ -108,7 +111,8 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec
     }
 
     HoodieCompactionPlan plan = scheduleCompaction();
-    if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
+    Option<HoodieCompactionPlan> option = Option.empty();
+    if (plan != null && nonEmpty(plan.getOperations())) {
       extraMetadata.ifPresent(plan::setExtraMetadata);
       try {
         if (operationType.equals(WriteOperationType.COMPACT)) {
@@ -125,11 +129,13 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec
       } catch (IOException ioe) {
         throw new HoodieIOException("Exception scheduling compaction", ioe);
       }
-      return Option.of(plan);
+      option = Option.of(plan);
     }
-    return Option.empty();
+
+    return option;
   }
 
+  @Nullable
   private HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
     // judge if we need to compact according to num delta commits and time elapsed
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 5e5d6de92d3..fe0d1879400 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -42,6 +42,8 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -65,6 +67,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa
     this.engineContext = engineContext;
   }
 
+  @Nullable
   public HoodieCompactionPlan generateCompactionPlan() throws IOException {
     // Accumulator to keep track of total log files for a table
     HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
new file mode 100644
index 00000000000..4b3eaaa1d42
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.FlinkClientUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClient<List<WriteStatus>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
+
+  /**
+   * Cached metadata writer for coordinator to reuse for each commit.
+   */
+  private HoodieBackedTableMetadataWriter metadataWriter;
+
+  protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig);
+  }
+
+  @Override
+  protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+    // only used for metadata table, the compaction happens in single thread
+    HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
+    commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
+    return compactionMetadata;
+  }
+
+  @Override
+  public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, getHoodieTable(), compactionInstantTime);
+  }
+
+  @Override
+  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
+      finalizeWrite(table, compactionCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+      writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
+      LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
+      CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
+    } finally {
+      this.txnManager.endTransaction(Option.of(compactionInstant));
+    }
+    WriteMarkersFactory
+        .get(config.getMarkersType(), table, compactionCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (compactionTimer != null) {
+      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+      try {
+        metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
+            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + compactionCommitTime, e);
+      }
+    }
+    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+  }
+
+  protected void completeClustering(
+      HoodieReplaceCommitMetadata metadata,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+      String clusteringCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
+    HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+      throw new HoodieClusteringException("Clustering failed to write to files:"
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+    }
+
+    try {
+      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+      finalizeWrite(table, clusteringCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+      writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
+      LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
+      table.getActiveTimeline().transitionReplaceInflightToComplete(
+          HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (IOException e) {
+      throw new HoodieClusteringException(
+          "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
+    } finally {
+      this.txnManager.endTransaction(Option.of(clusteringInstant));
+    }
+
+    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (clusteringTimer != null) {
+      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+      try {
+        metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
+            durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + clusteringCommitTime, e);
+      }
+    }
+    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+    return null;
+  }
+
+  @Override
+  protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  }
+
+  public HoodieFlinkTable<?> getHoodieTable() {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  }
+
+  @Override
+  public void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+    try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
+      metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
+    } catch (Exception e) {
+      throw new HoodieException("Failed to update metadata", e);
+    }
+  }
+
+  /**
+   * Initialize the table metadata writer, for e.g, bootstrap the metadata table
+   * from the filesystem if it does not exist.
+   */
+  private HoodieBackedTableMetadataWriter initMetadataWriter() {
+    return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
+        FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+  }
+
+  public void initMetadataTable() {
+    HoodieFlinkTable<?> table = getHoodieTable();
+    if (config.isMetadataTableEnabled()) {
+      // initialize the metadata table path
+      // guard the metadata writer with concurrent lock
+      try {
+        this.txnManager.getLockManager().lock();
+        initMetadataWriter().close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to initialize metadata table", e);
+      } finally {
+        this.txnManager.getLockManager().unlock();
+      }
+      // clean the obsolete index stats
+      table.deleteMetadataIndexIfNecessary();
+    } else {
+      // delete the metadata table if it was enabled but is now disabled
+      table.maybeDeleteMetadataTable();
+    }
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 5bf948de368..665950c036c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -33,31 +32,21 @@ import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.FlinkWriteHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
-import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
-import org.apache.hudi.util.FlinkClientUtil;
 import org.apache.hudi.util.WriteStatMerger;
 
 import com.codahale.metrics.Timer;
@@ -65,9 +54,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -99,6 +85,7 @@ public class HoodieFlinkWriteClient<T> extends
   public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
     super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
     this.bucketToHandles = new HashMap<>();
+    this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig);
   }
 
   /**
@@ -283,44 +270,14 @@ public class HoodieFlinkWriteClient<T> extends
 
   @Override
   protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
-    try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
-      metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
-    } catch (Exception e) {
-      throw new HoodieException("Failed to update metadata", e);
-    }
-  }
-
-  /**
-   * Initialize the table metadata writer, for e.g, bootstrap the metadata table
-   * from the filesystem if it does not exist.
-   */
-  private HoodieBackedTableMetadataWriter initMetadataWriter() {
-    return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
-        FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+    tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
   }
 
   /**
    * Initialized the metadata table on start up, should only be called once on driver.
    */
   public void initMetadataTable() {
-    HoodieFlinkTable<?> table = getHoodieTable();
-    if (config.isMetadataTableEnabled()) {
-      // initialize the metadata table path
-      // guard the metadata writer with concurrent lock
-      try {
-        this.txnManager.getLockManager().lock();
-        initMetadataWriter().close();
-      } catch (Exception e) {
-        throw new HoodieException("Failed to initialize metadata table", e);
-      } finally {
-        this.txnManager.getLockManager().unlock();
-      }
-      // clean the obsolete index stats
-      table.deleteMetadataIndexIfNecessary();
-    } else {
-      // delete the metadata table if it was enabled but is now disabled
-      table.maybeDeleteMetadataTable();
-    }
+    ((HoodieFlinkTableServiceClient<T>) tableServiceClient).initMetadataTable();
   }
 
   /**
@@ -331,11 +288,7 @@ public class HoodieFlinkWriteClient<T> extends
    * checkpoint finish.
    */
   public void startAsyncCleaning() {
-    if (this.asyncCleanerService == null) {
-      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
-    } else {
-      this.asyncCleanerService.start(null);
-    }
+    tableServiceClient.startAsyncCleanerService(this);
   }
 
   /**
@@ -346,9 +299,9 @@ public class HoodieFlinkWriteClient<T> extends
    * checkpoint finish.
    */
   public void waitForCleaningFinish() {
-    if (this.asyncCleanerService != null) {
+    if (tableServiceClient.asyncCleanerService != null) {
       LOG.info("Cleaner has been spawned already. Waiting for it to finish");
-      AsyncCleanerService.waitForCompletion(asyncCleanerService);
+      tableServiceClient.asyncClean();
       LOG.info("Cleaner has finished");
     }
   }
@@ -396,9 +349,7 @@ public class HoodieFlinkWriteClient<T> extends
       String compactionInstantTime,
       HoodieCommitMetadata metadata,
       Option<Map<String, String>> extraMetadata) {
-    HoodieFlinkTable<T> table = getHoodieTable();
-    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
-    completeCompaction(metadata, table, compactionInstantTime);
+    tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata);
   }
 
   @Override
@@ -406,43 +357,13 @@ public class HoodieFlinkWriteClient<T> extends
       HoodieCommitMetadata metadata,
       HoodieTable table,
       String compactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
-      finalizeWrite(table, compactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
-      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
-      writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
-      LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
-      CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(compactionInstant));
-    }
-    WriteMarkersFactory
-        .get(config.getMarkersType(), table, compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      try {
-        metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
-            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
-      } catch (ParseException e) {
-        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
-            + config.getBasePath() + " at time " + compactionCommitTime, e);
-      }
-    }
-    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+    tableServiceClient.completeCompaction(metadata, table, compactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
     // only used for metadata table, the compaction happens in single thread
-    HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
-    commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
-    return compactionMetadata;
+    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
   }
 
   @Override
@@ -454,46 +375,7 @@ public class HoodieFlinkWriteClient<T> extends
       HoodieReplaceCommitMetadata metadata,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
       String clusteringCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
-    HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
-    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
-        e.getValue().stream()).collect(Collectors.toList());
-    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
-      throw new HoodieClusteringException("Clustering failed to write to files:"
-          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
-    }
-
-    try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
-      finalizeWrite(table, clusteringCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
-      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
-      writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
-      LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
-      table.getActiveTimeline().transitionReplaceInflightToComplete(
-          HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (IOException e) {
-      throw new HoodieClusteringException(
-          "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
-    } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
-    }
-
-    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (clusteringTimer != null) {
-      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
-      try {
-        metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
-            durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
-      } catch (ParseException e) {
-        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
-            + config.getBasePath() + " at time " + clusteringCommitTime, e);
-      }
-    }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+    ((HoodieFlinkTableServiceClient<T>) tableServiceClient).completeClustering(metadata, table, clusteringCommitTime);
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
new file mode 100644
index 00000000000..2d823aa7f57
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient<List<WriteStatus>> {
+
+  protected HoodieJavaTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig);
+  }
+
+  @Override
+  protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+    throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaTableServiceClient");
+  }
+
+  @Override
+  public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaTableServiceClient");
+  }
+
+  @Override
+  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+    throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaTableServiceClient");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+    throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaTableServiceClient");
+  }
+
+  @Override
+  protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+    return HoodieJavaTable.create(config, context);
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index ed1b400a757..0f7f48194cd 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -51,8 +51,9 @@ import java.util.stream.Collectors;
 public class HoodieJavaWriteClient<T> extends
     BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
 
-  public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
-    super(context, clientConfig, JavaUpgradeDowngradeHelper.getInstance());
+  public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
+    super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance());
+    this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
   }
 
   public HoodieJavaWriteClient(HoodieEngineContext context,
@@ -60,6 +61,7 @@ public class HoodieJavaWriteClient<T> extends
                                boolean rollbackPending,
                                Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance());
+    this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
index 73037df40cb..888dda3ef2c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
@@ -50,7 +50,7 @@ public class HoodieSparkClusteringClient<T> extends
   public void cluster(HoodieInstant instant) throws IOException {
     LOG.info("Executing clustering instance " + instant);
     SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) clusteringClient;
-    Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp(), true).getCommitMetadata();
+    Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp()).getCommitMetadata();
     Stream<HoodieWriteStat> hoodieWriteStatStream = commitMetadata.get().getPartitionToWriteStats().entrySet().stream().flatMap(e ->
             e.getValue().stream());
     long errorsCount = hoodieWriteStatStream.mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
new file mode 100644
index 00000000000..1fcd6acb12d
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<JavaRDD<WriteStatus>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class);
+
+  protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig);
+  }
+
+  @Override
+  protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+    HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+      table.getMetaClient().reloadActiveTimeline();
+    }
+    compactionTimer = metrics.getCompactionCtx();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
+      completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
+    }
+    return compactionMetadata;
+  }
+
+  @Override
+  protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
+    HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
+    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
+      LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
+      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+      table.getMetaClient().reloadActiveTimeline();
+      throw new HoodieException("Inflight logcompaction file exists");
+    }
+    logCompactionTimer = metrics.getLogCompactionCtx();
+    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+    if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
+      completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
+    }
+    return logCompactionMetadata;
+  }
+
+  @Override
+  public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, table, compactionInstantTime);
+  }
+
+  @Override
+  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
+      finalizeWrite(table, compactionCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      updateTableMetadata(table, metadata, compactionInstant);
+      LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
+      CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
+    } finally {
+      this.txnManager.endTransaction(Option.of(compactionInstant));
+    }
+    WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (compactionTimer != null) {
+      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+      HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
+          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
+      );
+    }
+    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+  }
+
+  @Override
+  protected void completeLogCompaction(HoodieCommitMetadata metadata,
+                                       HoodieTable table,
+                                       String logCompactionCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
+      preCommit(metadata);
+      finalizeWrite(table, logCompactionCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      updateTableMetadata(table, metadata, logCompactionInstant);
+      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
+      CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
+    } finally {
+      this.txnManager.endTransaction(Option.of(logCompactionInstant));
+    }
+    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (compactionTimer != null) {
+      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+      HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
+          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
+      );
+    }
+    LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+  }
+
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
+    HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
+    if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
+      table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+      table.getMetaClient().reloadActiveTimeline();
+    }
+    clusteringTimer = metrics.getClusteringCtx();
+    LOG.info("Starting clustering at " + clusteringInstant);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+    // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
+    validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
+    // TODO : Where is shouldComplete used ?
+    if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
+      completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
+    }
+    return clusteringMetadata;
+  }
+
+  // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
+  private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
+                                    HoodieTable table,
+                                    String commitInstant) {
+
+    switch (tableServiceType) {
+      case CLUSTER:
+        completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
+        break;
+      case COMPACT:
+        completeCompaction(metadata, table, commitInstant);
+        break;
+      case LOG_COMPACT:
+        completeLogCompaction(metadata, table, commitInstant);
+        break;
+      default:
+        throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
+    }
+  }
+
+  private void completeClustering(HoodieReplaceCommitMetadata metadata,
+                                  HoodieTable table,
+                                  String clusteringCommitTime) {
+    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+
+    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+      throw new HoodieClusteringException("Clustering failed to write to files:"
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+    }
+
+    final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+
+      finalizeWrite(table, clusteringCommitTime, writeStats);
+      // Update table's metadata (table)
+      updateTableMetadata(table, metadata, clusteringInstant);
+
+      LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
+
+      table.getActiveTimeline().transitionReplaceInflightToComplete(
+          clusteringInstant,
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (Exception e) {
+      throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
+    } finally {
+      this.txnManager.endTransaction(Option.of(clusteringInstant));
+    }
+    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (clusteringTimer != null) {
+      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+      HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
+          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
+      );
+    }
+    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+  }
+
+  private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
+    if (clusteringMetadata.getWriteStatuses().isEmpty()) {
+      HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
+              table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
+          .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+              "Unable to read clustering plan for instant: " + clusteringCommitTime));
+      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
+          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+          + " write statuses");
+    }
+  }
+
+  private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
+                                   HoodieInstant hoodieInstant) {
+    boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
+    // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+    // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+    table.getMetadataWriter(hoodieInstant.getTimestamp())
+        .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+  }
+
+  /**
+   * Initialize the metadata table if needed. Creating the metadata table writer
+   * will trigger the initial bootstrapping from the data table.
+   *
+   * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
+   */
+  protected void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
+    if (config.isMetadataTableEnabled()) {
+      SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
+          context, Option.empty(), inFlightInstantTimestamp);
+    }
+  }
+
+  @Override
+  protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+    return HoodieSparkTable.create(config, context);
+  }
+
+  @Override
+  protected void preCommit(HoodieCommitMetadata metadata) {
+    // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+    // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+    HoodieTable table = createTable(config, hadoopConf);
+    resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 4ddcd13e4f4..5c094b8d5be 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,11 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
@@ -30,22 +27,15 @@ import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -55,8 +45,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
 
 import com.codahale.metrics.Timer;
@@ -66,11 +54,9 @@ import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
 @SuppressWarnings("checkstyle:LineLength")
 public class SparkRDDWriteClient<T> extends
@@ -96,6 +82,7 @@ public class SparkRDDWriteClient<T> extends
   public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
+    this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig);
   }
 
   @Override
@@ -289,56 +276,21 @@ public class SparkRDDWriteClient<T> extends
 
   @Override
   public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
-    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
-    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
-    completeCompaction(metadata, table, compactionInstantTime);
+    tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata);
   }
 
   @Override
   protected void completeCompaction(HoodieCommitMetadata metadata,
                                     HoodieTable table,
                                     String compactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
-      finalizeWrite(table, compactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, compactionInstant);
-      LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(compactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+    tableServiceClient.completeCompaction(metadata, table, compactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
-    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    compactionTimer = metrics.getCompactionCtx();
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
-    }
-    return compactionMetadata;
+    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
   }
 
   @Override
@@ -352,136 +304,21 @@ public class SparkRDDWriteClient<T> extends
   protected void completeLogCompaction(HoodieCommitMetadata metadata,
                                        HoodieTable table,
                                        String logCompactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
-      preCommit(logCompactionInstant, metadata);
-      finalizeWrite(table, logCompactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, logCompactionInstant);
-      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+    tableServiceClient.completeLogCompaction(metadata, table, logCompactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT, table.getMetaClient());
-    HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Inflight logcompaction file exists");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
-    }
-    return logCompactionMetadata;
+    return tableServiceClient.logCompact(logCompactionInstantTime, shouldComplete);
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
-    HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
-    if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    clusteringTimer = metrics.getClusteringCtx();
-    LOG.info("Starting clustering at " + clusteringInstant);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
-    validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
-    // TODO : Where is shouldComplete used ?
-    if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
-    }
-    return clusteringMetadata;
-  }
-
-  private void completeClustering(HoodieReplaceCommitMetadata metadata,
-                                  HoodieTable table,
-                                  String clusteringCommitTime) {
-    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
-        e.getValue().stream()).collect(Collectors.toList());
-
-    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
-      throw new HoodieClusteringException("Clustering failed to write to files:"
-          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
-    }
-
-    final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
-
-      finalizeWrite(table, clusteringCommitTime, writeStats);
-      // Update table's metadata (table)
-      updateTableMetadata(table, metadata, clusteringInstant);
-
-      LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
-
-      table.getActiveTimeline().transitionReplaceInflightToComplete(
-          clusteringInstant,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (Exception e) {
-      throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
-    } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (clusteringTimer != null) {
-      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
-      );
-    }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
-  }
-
-  private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
-    if (clusteringMetadata.getWriteStatuses().isEmpty()) {
-      HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
-              table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
-          .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
-              "Unable to read clustering plan for instant: " + clusteringCommitTime));
-      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
-          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }
-  }
-
-  private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
-                                   HoodieInstant hoodieInstant) {
-    boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
-    // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
-    // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
-    table.getMetadataWriter(hoodieInstant.getTimestamp())
-        .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+    return tableServiceClient.cluster(clusteringInstant, shouldComplete);
   }
 
   @Override
@@ -510,44 +347,12 @@ public class SparkRDDWriteClient<T> extends
     }
   }
 
-  // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
-  private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
-                                    HoodieTable table,
-                                    String commitInstant) {
-
-    switch (tableServiceType) {
-      case CLUSTER:
-        completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
-        break;
-      case COMPACT:
-        completeCompaction(metadata, table, commitInstant);
-        break;
-      case LOG_COMPACT:
-        completeLogCompaction(metadata, table, commitInstant);
-        break;
-      default:
-        throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
-    }
-  }
-
   @Override
   protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
     // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
     // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
     HoodieTable table = createTable(config, hadoopConf);
-    Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
-    try {
-      TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
-          Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
-      metrics.emitConflictResolutionSuccessful();
-    } catch (HoodieWriteConflictException e) {
-      metrics.emitConflictResolutionFailed();
-      throw e;
-    } finally {
-      if (conflictResolutionTimer != null) {
-        conflictResolutionTimer.stop();
-      }
-    }
+    resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index d226b5b995a..3a5e940d460 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -573,7 +573,7 @@ public class TestClientRollback extends HoodieClientTestBase {
 
       // the compaction instants should be excluded
       metaClient.reloadActiveTimeline();
-      assertEquals(0, client.getPendingRollbackInfos(metaClient).size());
+      assertEquals(0, client.getTableServiceClient().getPendingRollbackInfos(metaClient).size());
 
       // verify there is no extra rollback instants
       client.rollback(commitTime4);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ba32aea0b34..38033b9af35 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -2812,8 +2812,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     }
 
     @Override
-    protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
-      throw new HoodieException(CLUSTERING_FAILURE);
+    protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+      if (config.inlineClusteringEnabled()) {
+        throw new HoodieException(CLUSTERING_FAILURE);
+      }
     }
 
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java
new file mode 100644
index 00000000000..fa0c8c17251
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.model.ActionType;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Configurations used by the Hudi Table Service Manager.
+ */
+@Immutable
+@ConfigClassProperty(name = "Table Service Manager Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations used by the Hudi Table Service Manager.")
+public class HoodieTableServiceManagerConfig extends HoodieConfig {
+
+  public static final String TABLE_SERVICE_MANAGER_PREFIX = "hoodie.table.service.manager";
+
+  public static final ConfigProperty<Boolean> TABLE_SERVICE_MANAGER_ENABLED = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".enabled")
+      .defaultValue(false)
+      .withDocumentation("If true, use table service manager to execute table service");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_URIS = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".uris")
+      .defaultValue("http://localhost:9091")
+      .withDocumentation("Table service manager URIs (comma-delimited).");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_ACTIONS = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".actions")
+      .noDefaultValue()
+      .withDocumentation("The actions deployed on table service manager, such as compaction or clean.");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_USERNAME = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.username")
+      .defaultValue("default")
+      .withDocumentation("The user name for this table to deploy table services.");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_QUEUE = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.queue")
+      .defaultValue("default")
+      .withDocumentation("The queue for this table to deploy table services.");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_RESOURCES = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.resources")
+      .defaultValue("spark:4g,4g")
+      .withDocumentation("The resources for this table to use for deploying table services.");
+
+  public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_DEPLOY_PARALLELISM = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.parallelism")
+      .defaultValue(100)
+      .withDocumentation("The parallelism for this table to deploy table services.");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_EXECUTION_ENGINE = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".execution.engine")
+      .defaultValue("spark")
+      .withDocumentation("The execution engine to deploy for table service of this table, default spark");
+
+  public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_EXTRA_PARAMS = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.extra.params")
+      .noDefaultValue()
+      .withDocumentation("The extra params to deploy for table service of this table, split by ';'");
+
+  public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_TIMEOUT_SEC = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.timeout.sec")
+      .defaultValue(300)
+      .withDocumentation("Timeout in seconds for connections to table service manager.");
+
+  public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_RETRIES = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.retries")
+      .defaultValue(3)
+      .withDocumentation("Number of retries while opening a connection to table service manager");
+
+  public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_RETRY_DELAY_SEC = ConfigProperty
+      .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.retry.delay.sec")
+      .defaultValue(1)
+      .withDocumentation("Number of seconds for the client to wait between consecutive connection attempts");
+
+  public static HoodieTableServiceManagerConfig.Builder newBuilder() {
+    return new HoodieTableServiceManagerConfig.Builder();
+  }
+
+  public boolean isTableServiceManagerEnabled() {
+    return getBoolean(TABLE_SERVICE_MANAGER_ENABLED);
+  }
+
+  public String getTableServiceManagerURIs() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_URIS);
+  }
+
+  public String getTableServiceManagerActions() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_ACTIONS);
+  }
+
+  public String getDeployUsername() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_USERNAME);
+  }
+
+  public String getDeployQueue() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_QUEUE);
+  }
+
+  public String getDeployResources() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_RESOURCES);
+  }
+
+  public int getDeployParallelism() {
+    return getIntOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_PARALLELISM);
+  }
+
+  public String getDeployExtraParams() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_EXTRA_PARAMS);
+  }
+
+  public String getDeployExecutionEngine() {
+    return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_EXECUTION_ENGINE);
+  }
+
+  public int getConnectionTimeoutSec() {
+    return getIntOrDefault(TABLE_SERVICE_MANAGER_TIMEOUT_SEC);
+  }
+
+  public int getConnectionRetryLimit() {
+    return getIntOrDefault(TABLE_SERVICE_MANAGER_RETRIES);
+  }
+
+  public int getConnectionRetryDelay() {
+    return getIntOrDefault(TABLE_SERVICE_MANAGER_RETRY_DELAY_SEC);
+  }
+
+  public boolean isEnabledAndActionSupported(ActionType actionType) {
+    return isTableServiceManagerEnabled() && getTableServiceManagerActions().contains(actionType.name());
+  }
+
+  public static class Builder {
+    private final HoodieTableServiceManagerConfig config = new HoodieTableServiceManagerConfig();
+
+    public Builder fromProperties(Properties props) {
+      this.config.getProps().putAll(props);
+      return this;
+    }
+
+    public Builder setURIs(String uris) {
+      config.setValue(TABLE_SERVICE_MANAGER_URIS, uris);
+      return this;
+    }
+
+    public HoodieTableServiceManagerConfig build() {
+      config.setDefaults(HoodieTableServiceManagerConfig.class.getName());
+      return config;
+    }
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index b451c364186..5acbe55fe8b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -281,7 +281,7 @@ public class HoodieFlinkClusteringJob {
       if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
         LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
         table.rollbackInflightClustering(inflightInstant,
-            commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+            commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
         table.getMetaClient().reloadActiveTimeline();
       }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 34352ec80bc..b183ba3a4b0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -78,7 +78,7 @@ public class ClusteringUtil {
     inflightInstants.forEach(inflightInstant -> {
       LOG.info("Rollback the inflight clustering instant: " + inflightInstant + " for failover");
       table.rollbackInflightClustering(inflightInstant,
-          commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+          commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
       table.getMetaClient().reloadActiveTimeline();
     });
   }
@@ -95,7 +95,7 @@ public class ClusteringUtil {
     if (table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant)) {
       LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
       table.rollbackInflightClustering(inflightInstant,
-          commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+          commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java
deleted file mode 100644
index 20982b5cda6..00000000000
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-// Sole purpose of this class is to provide access to otherwise API inaccessible from the tests.
-// While it's certainly not a great pattern, it would require substantial test restructuring to
-// eliminate such access to an internal API, so this is considered acceptable given it's very limited
-// scope (w/in the current package)
-class SparkRDDWriteClientOverride extends org.apache.hudi.client.SparkRDDWriteClient {
-
-  public SparkRDDWriteClientOverride(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
-    super(context, clientConfig);
-  }
-
-  @Override
-  public void rollbackFailedBootstrap() {
-    super.rollbackFailedBootstrap();
-  }
-}
-
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 543e01702d3..22d2e934d53 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -20,6 +20,7 @@ package org.apache.hudi.functional;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.bootstrap.BootstrapMode;
 import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -79,7 +80,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF1;
 import org.apache.spark.sql.types.DataTypes;
 import org.junit.jupiter.api.AfterEach;
@@ -129,11 +129,10 @@ public class TestBootstrap extends HoodieClientTestBase {
 
   private HoodieParquetRealtimeInputFormat rtInputFormat;
   private JobConf rtJobConf;
-  private SparkSession spark;
 
   @BeforeEach
   public void setUp() throws Exception {
-    bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
+    bootstrapBasePath = tmpFolder.toAbsolutePath() + "/data";
     initPath();
     initSparkContexts();
     initTestDataGenerator();
@@ -254,7 +253,7 @@ public class TestBootstrap extends HoodieClientTestBase {
             .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
         .build();
 
-    SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
+    SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
     client.bootstrap(Option.empty());
     checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
         numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -263,7 +262,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
         deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
     metaClient.reloadActiveTimeline();
-    client.rollbackFailedBootstrap();
+    client.getTableServiceClient().rollbackFailedBootstrap();
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
     assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
@@ -271,9 +270,10 @@ public class TestBootstrap extends HoodieClientTestBase {
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
     assertFalse(index.useIndex());
+    client.close();
 
     // Run bootstrap again
-    client = new SparkRDDWriteClientOverride(context, config);
+    client = new SparkRDDWriteClient(context, config);
     client.bootstrap(Option.empty());
 
     metaClient.reloadActiveTimeline();
@@ -307,6 +307,7 @@ public class TestBootstrap extends HoodieClientTestBase {
           numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
           Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
     }
+    client.close();
   }
 
   @Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index a236755d072..cc9b4cefcb6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -21,6 +21,7 @@ package org.apache.hudi.functional;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.bootstrap.BootstrapMode;
 import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -50,7 +51,6 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -74,7 +74,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF1;
 import org.apache.spark.sql.types.DataTypes;
 import org.junit.jupiter.api.AfterEach;
@@ -109,9 +108,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Tag("functional")
 public class TestOrcBootstrap extends HoodieClientTestBase {
 
-
-  public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,"
-      + "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
   @TempDir
   public java.nio.file.Path tmpFolder;
 
@@ -120,13 +116,9 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
   private HoodieParquetInputFormat roInputFormat;
   private JobConf roJobConf;
 
-  private HoodieParquetRealtimeInputFormat rtInputFormat;
-  private JobConf rtJobConf;
-  private SparkSession spark;
-
   @BeforeEach
   public void setUp() throws Exception {
-    bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
+    bootstrapBasePath = tmpFolder.toAbsolutePath() + "/data";
     initPath();
     initSparkContexts();
     initTestDataGenerator();
@@ -251,7 +243,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
             .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
         .build();
 
-    SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
+    SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
     client.bootstrap(Option.empty());
     checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
         numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -262,7 +254,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
     } else {
       FileCreateUtils.deleteCommit(metaClient.getBasePath(), bootstrapCommitInstantTs);
     }
-    client.rollbackFailedBootstrap();
+    client.getTableServiceClient().rollbackFailedBootstrap();
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
     assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
@@ -270,9 +262,10 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
     assertFalse(index.useIndex());
+    client.close();
 
     // Run bootstrap again
-    client = new SparkRDDWriteClientOverride(context, config);
+    client = new SparkRDDWriteClient(context, config);
     client.bootstrap(Option.empty());
 
     metaClient.reloadActiveTimeline();
@@ -306,6 +299,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
           numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
           Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
     }
+    client.close();
   }
 
   @Test
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index f6905f92d94..3c97b732eb6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -196,7 +196,7 @@ public class HoodieClusteringJob {
           throw new HoodieClusteringException("There is no scheduled clustering in the table.");
         }
       }
-      Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata();
+      Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime).getCommitMetadata();
 
       return UtilHelpers.handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
     }
@@ -252,7 +252,7 @@ public class HoodieClusteringJob {
 
       LOG.info("The schedule instant time is " + instantTime.get());
       LOG.info("Step 2: Do cluster");
-      Option<HoodieCommitMetadata> metadata = client.cluster(instantTime.get(), true).getCommitMetadata();
+      Option<HoodieCommitMetadata> metadata = client.cluster(instantTime.get()).getCommitMetadata();
       return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
     }
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e57ac513a8e..10dbf998a5f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -386,7 +386,7 @@ public class DeltaSync implements Serializable, Closeable {
       if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
         Option<String> pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt);
         if (pendingClusteringInstant.isPresent()) {
-          writeClient.cluster(pendingClusteringInstant.get(), true);
+          writeClient.cluster(pendingClusteringInstant.get());
         }
       }