You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/03 17:10:12 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6732: [HUDI-4148] Add client for hudi table service manager

xushiyan commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r1060687626


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -85,6 +91,8 @@ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig client
   public void close() {
     stopEmbeddedServerView(true);
     this.context.setJobStatus("", "");
+    this.heartbeatClient.stop();
+    this.txnManager.close();

Review Comment:
   remark: this is common to all clients, so moved here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {

Review Comment:
   remark: logic for table services like clean, cluster, compact, archive, rollback were moved here from base write client



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient<List<WriteStatus>> {

Review Comment:
   remark: the table services' logic were replicated as per java write client's implementation



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(context, instantTime, stats);
+      if (finalizeCtx != null) {
+        Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
+  /**
+   * Write the HoodieCommitMetadata to metadata table if available.
+   *
+   * @param table       {@link HoodieTable} of interest.
+   * @param instantTime instant time of the commit.
+   * @param actionType  action type of the commit.
+   * @param metadata    instance of {@link HoodieCommitMetadata}.
+   */
+  protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+    table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
+        table.isTableServiceAction(actionType, instantTime)));

Review Comment:
   this method in table service client should always assert isTableServiceAction() is true. we should do a ValidationUtils.checkArgument at the beginning of this method



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {

Review Comment:
   @yuzhaojing should this be extracted to base client?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(context, instantTime, stats);
+      if (finalizeCtx != null) {
+        Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
+  /**
+   * Write the HoodieCommitMetadata to metadata table if available.
+   *
+   * @param table       {@link HoodieTable} of interest.
+   * @param instantTime instant time of the commit.
+   * @param actionType  action type of the commit.
+   * @param metadata    instance of {@link HoodieCommitMetadata}.
+   */
+  protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+    table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
+        table.isTableServiceAction(actionType, instantTime)));

Review Comment:
   `writeTableMetadata()` in the base write client should delegate to table service client when the action is table service. Though the implementation is the equivalent now, from API standpoint, we should make it delegated accordingly



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -352,182 +303,23 @@ public void commitLogCompaction(String logCompactionInstantTime, HoodieCommitMet
   protected void completeLogCompaction(HoodieCommitMetadata metadata,
                                        HoodieTable table,
                                        String logCompactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
-      preCommit(logCompactionInstant, metadata);
-      finalizeWrite(table, logCompactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, logCompactionInstant);
-      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+    preCommit(logCompactionInstant, metadata);
+    tableServiceClient.completeLogCompaction(metadata, table, logCompactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT, table.getMetaClient());
-    HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Inflight logcompaction file exists");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
-    }
-    return logCompactionMetadata;
+    return tableServiceClient.logCompact(logCompactionInstantTime, shouldComplete);

Review Comment:
   if you force cast `tableServiceClient` here, it'll be easier to navigate through IDE



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java:
##########
@@ -283,44 +270,14 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
 
   @Override
   protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
-    try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
-      metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
-    } catch (Exception e) {
-      throw new HoodieException("Failed to update metadata", e);
-    }
-  }
-
-  /**
-   * Initialize the table metadata writer, for e.g, bootstrap the metadata table
-   * from the filesystem if it does not exist.
-   */
-  private HoodieBackedTableMetadataWriter initMetadataWriter() {
-    return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
-        FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+    tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);

Review Comment:
   flink write client always delegates this to table service client, which differs from the base write client behavior. Similarly let's follow up a task to consolidate this.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -352,182 +303,23 @@ public void commitLogCompaction(String logCompactionInstantTime, HoodieCommitMet
   protected void completeLogCompaction(HoodieCommitMetadata metadata,
                                        HoodieTable table,
                                        String logCompactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
-      preCommit(logCompactionInstant, metadata);
-      finalizeWrite(table, logCompactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, logCompactionInstant);
-      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+    preCommit(logCompactionInstant, metadata);
+    tableServiceClient.completeLogCompaction(metadata, table, logCompactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT, table.getMetaClient());
-    HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Inflight logcompaction file exists");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
-    }
-    return logCompactionMetadata;
+    return tableServiceClient.logCompact(logCompactionInstantTime, shouldComplete);
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
-    HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
-    HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
-    if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    clusteringTimer = metrics.getClusteringCtx();
-    LOG.info("Starting clustering at " + clusteringInstant);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
-    validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
-    // TODO : Where is shouldComplete used ?
-    if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
-    }
-    return clusteringMetadata;
-  }
-
-  private void completeClustering(HoodieReplaceCommitMetadata metadata,
-                                  HoodieTable table,
-                                  String clusteringCommitTime) {
-    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
-        e.getValue().stream()).collect(Collectors.toList());
-
-    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
-      throw new HoodieClusteringException("Clustering failed to write to files:"
-          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
-    }
-
-    final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
-
-      finalizeWrite(table, clusteringCommitTime, writeStats);
-      // Update table's metadata (table)
-      updateTableMetadata(table, metadata, clusteringInstant);
-
-      LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
-
-      table.getActiveTimeline().transitionReplaceInflightToComplete(
-          clusteringInstant,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (Exception e) {
-      throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
-    } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (clusteringTimer != null) {
-      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
-      );
-    }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
-  }
-
-  private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
-    if (clusteringMetadata.getWriteStatuses().isEmpty()) {
-      HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
-              table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
-          .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
-              "Unable to read clustering plan for instant: " + clusteringCommitTime));
-      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
-          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }
-  }
-
-  private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
-                                   HoodieInstant hoodieInstant) {
-    boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
-    // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
-    // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
-    table.getMetadataWriter(hoodieInstant.getTimestamp())
-        .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
-  }
-
-  @Override
-  protected void initMetadataTable(Option<String> instantTime) {
-    // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
-    // if it didn't exist before
-    // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
-    initializeMetadataTable(instantTime);
-  }
-
-  /**
-   * Initialize the metadata table if needed. Creating the metadata table writer
-   * will trigger the initial bootstrapping from the data table.
-   *
-   * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
-   */
-  private void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
-    if (config.isMetadataTableEnabled()) {
-      HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
-          context, Option.empty(), inFlightInstantTimestamp);
-      try {
-        writer.close();
-      } catch (Exception e) {
-        throw new HoodieException("Failed to instantiate Metadata table ", e);
-      }
-    }
-  }

Review Comment:
   these 2 not used by spark table service client? again, by design they fit better with write client not table service client



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java:
##########
@@ -283,44 +270,14 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
 
   @Override
   protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
-    try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
-      metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
-    } catch (Exception e) {
-      throw new HoodieException("Failed to update metadata", e);
-    }
-  }
-
-  /**
-   * Initialize the table metadata writer, for e.g, bootstrap the metadata table
-   * from the filesystem if it does not exist.
-   */
-  private HoodieBackedTableMetadataWriter initMetadataWriter() {
-    return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
-        FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+    tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
   }
 
   /**
    * Initialized the metadata table on start up, should only be called once on driver.
    */
   public void initMetadataTable() {
-    HoodieFlinkTable<?> table = getHoodieTable();
-    if (config.isMetadataTableEnabled()) {
-      // initialize the metadata table path
-      // guard the metadata writer with concurrent lock
-      try {
-        this.txnManager.getLockManager().lock();
-        initMetadataWriter().close();
-      } catch (Exception e) {
-        throw new HoodieException("Failed to initialize metadata table", e);
-      } finally {
-        this.txnManager.getLockManager().unlock();
-      }
-      // clean the obsolete index stats
-      table.deleteMetadataIndexIfNecessary();
-    } else {
-      // delete the metadata table if it was enabled but is now disabled
-      table.maybeDeleteMetadataTable();
-    }
+    ((HoodieFlinkTableServiceClient<T>) tableServiceClient).initMetadataTable();

Review Comment:
   by design initMetadataTable should be responsibility of a write client. Can you file a follow up task: consolidate the way MDT is initialized. i see that `org.apache.hudi.client.BaseHoodieWriteClient#doInitTable` and `org.apache.hudi.client.BaseHoodieWriteClient#initMetadataTable` are skipped by flink write client. we should standardize the clients' lifecycles.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {

Review Comment:
   remark: finalizeWrite is still retained in base write client for commit()



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -352,182 +303,23 @@ public void commitLogCompaction(String logCompactionInstantTime, HoodieCommitMet
   protected void completeLogCompaction(HoodieCommitMetadata metadata,
                                        HoodieTable table,
                                        String logCompactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
-      preCommit(logCompactionInstant, metadata);
-      finalizeWrite(table, logCompactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, logCompactionInstant);
-      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+    preCommit(logCompactionInstant, metadata);
+    tableServiceClient.completeLogCompaction(metadata, table, logCompactionCommitTime);

Review Comment:
   preCommit was part of the transaction, now it happens before transaction starts. What is the impact here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org