You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/04 07:07:39 UTC

[incubator-hudi] branch master updated: [HUDI-756] Organize Cleaning Action execution into a single package in hudi-client (#1485)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf6cc2  [HUDI-756] Organize Cleaning Action execution into a single package in hudi-client (#1485)
eaf6cc2 is described below

commit eaf6cc2d90bf27c0d9414a4ea18dbd1b61f58e50
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Sat Apr 4 00:07:34 2020 -0700

    [HUDI-756] Organize Cleaning Action execution into a single package in hudi-client (#1485)
    
    - Introduced a thin abstraction ActionExecutor, that all actions will implement
    - Pulled cleaning code from table, writeclient into a single package
    - CleanHelper is now CleanPlanner, HoodieCleanClient is no longer around
    - Minor refactor of HoodieTable factory method
    - HoodieTable.create() methods with and without metaclient passed in
    - HoodieTable constructor now does not do a redundant instantiation
    - Fixed existing unit tests to work at the HoodieWriteClient level
---
 .../apache/hudi/client/AbstractHoodieClient.java   |   4 -
 .../hudi/client/AbstractHoodieWriteClient.java     |  16 +-
 .../org/apache/hudi/client/HoodieCleanClient.java  | 197 ---------------
 .../org/apache/hudi/client/HoodieReadClient.java   |   2 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  62 ++---
 .../apache/hudi/table/HoodieCommitArchiveLog.java  |   2 +-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  | 172 +------------
 .../apache/hudi/table/HoodieMergeOnReadTable.java  |   5 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  57 +++--
 .../hudi/table/action/BaseActionExecutor.java      |  43 ++++
 .../table/action/clean/CleanActionExecutor.java    | 280 +++++++++++++++++++++
 .../clean/CleanPlanner.java}                       |  11 +-
 .../table/action/clean/PartitionCleanStat.java     |  70 ++++++
 .../compact/HoodieMergeOnReadTableCompactor.java   |   2 +-
 .../org/apache/hudi/client/TestClientRollback.java |   6 +-
 .../apache/hudi/client/TestHoodieClientBase.java   |  11 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   2 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   6 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  26 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |  16 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   8 +-
 .../io/storage/TestHoodieStorageWriterFactory.java |   2 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |  48 ++--
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  20 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  48 ++--
 .../hudi/table/compact/TestAsyncCompaction.java    |   2 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  10 +-
 .../org/apache/hudi/common/util/CleanerUtils.java  |   5 +-
 .../apache/hudi/common/model/HoodieTestUtils.java  |   2 +-
 .../table/view/TestIncrementalFSViewSync.java      |   3 +-
 .../org/apache/hudi/IncrementalRelation.scala      |   2 +-
 31 files changed, 585 insertions(+), 555 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index 18bc9be..a0b4be8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -118,10 +118,6 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
     return config;
   }
 
-  public Option<EmbeddedTimelineService> getTimelineServer() {
-    return timelineServer;
-  }
-
   protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
     return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 6008fe4..ea5ed9f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -140,9 +140,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
   private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata, String actionType) {
 
-    LOG.info("Commiting " + instantTime);
+    LOG.info("Committing " + instantTime);
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
 
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
@@ -221,7 +221,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     // TODO : make sure we cannot rollback / archive last commit file
     try {
       // Create a Hoodie table which encapsulated the commits and files visible
-      HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+      HoodieTable table = HoodieTable.create(config, jsc);
       // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
       // there may be race conditions
       HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
@@ -272,7 +272,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
       setWriteSchemaFromLastInstant(metaClient);
     }
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
       writeContext = metrics.getCommitCtx();
     } else {
@@ -321,8 +321,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     // Create a Hoodie table which encapsulated the commits and files visible
     try {
       // Create a Hoodie table which encapsulated the commits and files visible
-      HoodieTable<T> table = HoodieTable.getHoodieTable(
-          createMetaClient(true), config, jsc);
+      HoodieTable<T> table = HoodieTable.create(config, jsc);
       Option<HoodieInstant> rollbackInstantOpt =
           Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
               .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))
@@ -341,8 +340,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
   protected List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
       IOException {
     final String commitToRollback = instantToRollback.getTimestamp();
-    HoodieTable<T> table = HoodieTable.getHoodieTable(
-        createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
     // Check if any of the commits is a savepoint - do not allow rollback on those commits
@@ -391,7 +389,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
 
   private void finishRollback(final Timer.Context context, List<HoodieRollbackStat> rollbackStats,
       List<String> commitsToRollback, final String startRollbackTime) throws IOException {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     Option<Long> durationInMs = Option.empty();
     long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
     if (context != null) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java
deleted file mode 100644
index d622f70..0000000
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client;
-
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.util.CleanerUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-
-import com.codahale.metrics.Timer;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.util.List;
-
-public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
-  private final transient HoodieMetrics metrics;
-
-  public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
-    this(jsc, clientConfig, metrics, Option.empty());
-  }
-
-  public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics,
-      Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, clientConfig, timelineService);
-    this.metrics = metrics;
-  }
-
-  /**
-   * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
-   * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
-   * cleaned)
-   */
-  public void clean() throws HoodieIOException {
-    String startCleanTime = HoodieActiveTimeline.createNewInstantTime();
-    clean(startCleanTime);
-  }
-
-  /**
-   * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
-   * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
-   * cleaned)
-   *
-   * @param startCleanTime Cleaner Instant Timestamp
-   * @throws HoodieIOException in case of any IOException
-   */
-  public HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
-    // Create a Hoodie table which encapsulated the commits and files visible
-    final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
-
-    // If there are inflight(failed) or previously requested clean operation, first perform them
-    table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
-      LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
-      try {
-        runClean(table, hoodieInstant);
-      } catch (Exception e) {
-        LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
-      }
-    });
-
-    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
-
-    if (cleanerPlanOpt.isPresent()) {
-      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
-      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
-          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
-        final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
-        return runClean(hoodieTable, HoodieTimeline.getCleanRequestedInstant(startCleanTime), cleanerPlan);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
-   *
-   * @param startCleanTime Cleaner Instant Time
-   * @return Cleaner Plan if generated
-   */
-  protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
-    // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
-
-    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
-
-    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
-        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
-
-      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
-      // Save to both aux and timeline folder
-      try {
-        table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
-        LOG.info("Requesting Cleaning with instant time " + cleanInstant);
-      } catch (IOException e) {
-        LOG.error("Got exception when saving cleaner requested file", e);
-        throw new HoodieIOException(e.getMessage(), e);
-      }
-      return Option.of(cleanerPlan);
-    }
-    return Option.empty();
-  }
-
-  /**
-   * Executes the Cleaner plan stored in the instant metadata.
-   *
-   * @param table Hoodie Table
-   * @param cleanInstant Cleaner Instant
-   */
-  public HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant) {
-    try {
-      HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
-      return runClean(table, cleanInstant, cleanerPlan);
-    } catch (IOException e) {
-      throw new HoodieIOException(e.getMessage(), e);
-    }
-  }
-
-  private HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant,
-      HoodieCleanerPlan cleanerPlan) {
-    ValidationUtils.checkArgument(
-        cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT));
-
-    try {
-      LOG.info("Cleaner started");
-      final Timer.Context context = metrics.getCleanCtx();
-
-      if (!cleanInstant.isInflight()) {
-        // Mark as inflight first
-        cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
-            TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
-      }
-
-      List<HoodieCleanStat> cleanStats = table.clean(jsc, cleanInstant, cleanerPlan);
-
-      if (cleanStats.isEmpty()) {
-        return HoodieCleanMetadata.newBuilder().build();
-      }
-
-      // Emit metrics (duration, numFilesDeleted) if needed
-      Option<Long> durationInMs = Option.empty();
-      if (context != null) {
-        durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
-        LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
-      }
-
-      HoodieTableMetaClient metaClient = createMetaClient(true);
-      // Create the metadata and save it
-      HoodieCleanMetadata metadata =
-          CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
-      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
-      metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
-
-      table.getActiveTimeline().transitionCleanInflightToComplete(
-          new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
-          TimelineMetadataUtils.serializeCleanMetadata(metadata));
-      LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
-      return metadata;
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to clean up after commit", e);
-    }
-  }
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 2dedfaf..4b97c4a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -96,7 +96,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-    this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc);
+    this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc);
     this.index = HoodieIndex.createIndex(clientConfig, jsc);
     this.sqlContextOpt = Option.empty();
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 2a25a76..e69eef2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -98,7 +98,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   private static final String LOOKUP_STR = "lookup";
   private final boolean rollbackPending;
   private final transient HoodieMetrics metrics;
-  private final transient HoodieCleanClient<T> cleanClient;
   private transient Timer.Context compactionTimer;
 
   /**
@@ -139,7 +138,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     super(jsc, index, clientConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
-    this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService);
   }
 
   /**
@@ -161,7 +159,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     Timer.Context indexTimer = metrics.getIndexCtx();
     JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
     metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
@@ -539,7 +537,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was created successfully
    */
   public boolean savepoint(String user, String comment) {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
     }
@@ -567,7 +565,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was created successfully
    */
   public boolean savepoint(String instantTime, String user, String comment) {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
       throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
     }
@@ -628,7 +626,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was deleted successfully
    */
   public void deleteSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
       throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
     }
@@ -655,7 +653,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param compactionTime - delete the compaction time
    */
   private void deleteRequestedCompaction(String compactionTime) {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieInstant compactionRequestedInstant =
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
@@ -682,7 +680,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was rollecback to successfully
    */
   public boolean rollbackToSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
 
     // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
@@ -737,7 +735,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
 
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     // Get all the commits on the timeline after the provided commit time
     List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
         .getReverseOrderedInstants()
@@ -788,7 +786,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
   private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
       List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     Option<Long> durationInMs = Option.empty();
     long numFilesDeleted = 0L;
     for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
@@ -821,7 +819,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public void close() {
     // Stop timeline-server if running
     super.close();
-    this.cleanClient.close();
   }
 
   /**
@@ -829,20 +826,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
    * cleaned)
    */
-  public void clean() throws HoodieIOException {
-    cleanClient.clean();
+  public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
+    LOG.info("Cleaner started");
+    final Timer.Context context = metrics.getCleanCtx();
+
+    HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
+
+    if (context != null) {
+      long durationMs = metrics.getDurationInMs(context.stop());
+      metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
+      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+          + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+          + " cleanerElaspsedMs" + durationMs);
+    }
+
+    return metadata;
   }
 
-  /**
-   * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
-   * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
-   * cleaned)
-   *
-   * @param startCleanTime Cleaner Instant Timestamp
-   * @throws HoodieIOException in case of any IOException
-   */
-  protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
-    return cleanClient.clean(startCleanTime);
+  public HoodieCleanMetadata clean() {
+    return clean(HoodieActiveTimeline.createNewInstantTime());
   }
 
   /**
@@ -882,7 +884,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
             HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
         "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
             + latestPending + ",  Ingesting at " + instantTime));
-    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     String commitActionType = table.getMetaClient().getCommitActionType();
     activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime));
@@ -924,7 +926,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
         "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
             + conflictingInstants);
-    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
     HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
     if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
       extraMetadata.ifPresent(workload::setExtraMetadata);
@@ -957,7 +959,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata) throws IOException {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
         timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
@@ -1020,7 +1022,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * Cleanup all pending commits.
    */
   private void rollbackPendingCommits() {
-    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
     HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
@@ -1038,7 +1040,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
     HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
@@ -1046,7 +1048,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       rollbackInflightCompaction(inflightInstant, table);
       // refresh table
       metaClient = createMetaClient(true);
-      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      table = HoodieTable.create(metaClient, config, jsc);
       pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
     }
 
@@ -1076,7 +1078,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
     compactionTimer = metrics.getCompactionCtx();
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
     JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
     // Force compaction action
     statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
index e8fdb77..635e96b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
@@ -138,7 +138,7 @@ public class HoodieCommitArchiveLog {
     int maxCommitsToKeep = config.getMaxCommitsToKeep();
     int minCommitsToKeep = config.getMinCommitsToKeep();
 
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // GroupBy each action and limit each action timeline to maxCommitsToKeep
     // TODO: Handle ROLLBACK_ACTION in future
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index de43900..f01cdaa 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -18,22 +18,22 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ParquetReaderIterator;
-import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -45,20 +45,15 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
 import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.rollback.RollbackHelper;
 import org.apache.hudi.table.rollback.RollbackRequest;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
@@ -67,9 +62,9 @@ import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -81,9 +76,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
 /**
  * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
  * zero read amplification.
@@ -97,49 +89,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
 
   private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);
 
-  public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) {
-    super(config, jsc);
-  }
-
-  private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
-      HoodieTable table) {
-    return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
-      Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-
-      FileSystem fs = table.getMetaClient().getFs();
-      Path basePath = new Path(table.getMetaClient().getBasePath());
-      while (iter.hasNext()) {
-        Tuple2<String, String> partitionDelFileTuple = iter.next();
-        String partitionPath = partitionDelFileTuple._1();
-        String delFileName = partitionDelFileTuple._2();
-        Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
-        String deletePathStr = deletePath.toString();
-        Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-        if (!partitionCleanStatMap.containsKey(partitionPath)) {
-          partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
-        }
-        PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
-        partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
-        partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
-      }
-      return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
-          .collect(Collectors.toList()).iterator();
-    };
-  }
-
-  private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
-    Path deletePath = new Path(deletePathStr);
-    LOG.debug("Working on delete path :" + deletePath);
-    try {
-      boolean deleteResult = fs.delete(deletePath, false);
-      if (deleteResult) {
-        LOG.debug("Cleaned file at path :" + deletePath);
-      }
-      return deleteResult;
-    } catch (FileNotFoundException fio) {
-      // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
-      return false;
-    }
+  public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
+    super(config, jsc, metaClient);
   }
 
   @Override
@@ -278,77 +229,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
   }
 
-  /**
-   * Generates List of files to be cleaned.
-   * 
-   * @param jsc JavaSparkContext
-   * @return Cleaner Plan
-   */
-  @Override
-  public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
-    try {
-      CleanHelper cleaner = new CleanHelper(this, config);
-      Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
-
-      List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
-
-      if (partitionsToClean.isEmpty()) {
-        LOG.info("Nothing to clean here. It is already clean");
-        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
-      }
-      LOG.info(
-          "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
-      int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
-      LOG.info("Using cleanerParallelism: " + cleanerParallelism);
-
-      Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
-          .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
-          .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-      return new HoodieCleanerPlan(earliestInstant
-          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
-          config.getCleanerPolicy().name(), cleanOps, 1);
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to schedule clean operation", e);
-    }
-  }
-
-  /**
-   * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
-   * skews in partitions to clean by making files to clean as the unit of task distribution.
-   *
-   * @throws IllegalArgumentException if unknown cleaning policy is provided
-   */
   @Override
-  public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
-    int cleanerParallelism = Math.min(
-        (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
-        config.getCleanerParallelism());
-    LOG.info("Using cleanerParallelism: " + cleanerParallelism);
-    List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
-        .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
-            .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
-            .collect(Collectors.toList()), cleanerParallelism)
-        .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(PartitionCleanStat::merge).collect();
-
-    Map<String, PartitionCleanStat> partitionCleanStatsMap =
-        partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
-
-    // Return PartitionCleanStat for each partition passed.
-    return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
-      PartitionCleanStat partitionCleanStat =
-          (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
-              : new PartitionCleanStat(partitionPath);
-      HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
-      return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
-          .withEarliestCommitRetained(Option.ofNullable(
-              actionInstant != null
-                  ? new HoodieInstant(State.valueOf(actionInstant.getState()),
-                      actionInstant.getAction(), actionInstant.getTimestamp())
-                  : null))
-          .withDeletePathPattern(partitionCleanStat.deletePathPatterns)
-          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
-          .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
-    }).collect(Collectors.toList());
+  public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) {
+    return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute();
   }
 
   @Override
@@ -446,40 +329,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     }
   }
 
-  private static class PartitionCleanStat implements Serializable {
-
-    private final String partitionPath;
-    private final List<String> deletePathPatterns = new ArrayList<>();
-    private final List<String> successDeleteFiles = new ArrayList<>();
-    private final List<String> failedDeleteFiles = new ArrayList<>();
 
-    private PartitionCleanStat(String partitionPath) {
-      this.partitionPath = partitionPath;
-    }
-
-    private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
-      if (deletedFileResult) {
-        successDeleteFiles.add(deletePathStr);
-      } else {
-        failedDeleteFiles.add(deletePathStr);
-      }
-    }
-
-    private void addDeleteFilePatterns(String deletePathStr) {
-      deletePathPatterns.add(deletePathStr);
-    }
-
-    private PartitionCleanStat merge(PartitionCleanStat other) {
-      if (!this.partitionPath.equals(other.partitionPath)) {
-        throw new RuntimeException(
-            String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath));
-      }
-      successDeleteFiles.addAll(other.successDeleteFiles);
-      deletePathPatterns.addAll(other.deletePathPatterns);
-      failedDeleteFiles.addAll(other.failedDeleteFiles);
-      return this;
-    }
-  }
 
   /**
    * Helper class for a small file's location and its actual size on disk.
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 7e96eb6..d7783cd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -84,8 +85,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
   // UpsertPartitioner for MergeOnRead table type
   private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
 
-  public HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc) {
-    super(config, jsc);
+  HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
+    super(config, jsc, metaClient);
   }
 
   @Override
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index e3c134c..50ec45f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ClientUtils;
-import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuard;
@@ -39,22 +40,20 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -87,12 +86,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
 
   protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
 
-  protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
+  protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
     this.config = config;
     this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
     this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()),
         config.getViewStorageConfig());
-    this.metaClient = ClientUtils.createMetaClient(jsc, config, true);
+    this.metaClient = metaClient;
     this.index = HoodieIndex.createIndex(config, jsc);
   }
 
@@ -103,13 +102,25 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     return viewManager;
   }
 
-  public static <T extends HoodieRecordPayload> HoodieTable<T> getHoodieTable(HoodieTableMetaClient metaClient,
-      HoodieWriteConfig config, JavaSparkContext jsc) {
+  public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieWriteConfig config, JavaSparkContext jsc) {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
+        jsc.hadoopConfiguration(),
+        config.getBasePath(),
+        true,
+        config.getConsistencyGuardConfig(),
+        Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
+    );
+    return HoodieTable.create(metaClient, config, jsc);
+  }
+
+  public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieTableMetaClient metaClient,
+                                                                      HoodieWriteConfig config,
+                                                                      JavaSparkContext jsc) {
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
-        return new HoodieCopyOnWriteTable<>(config, jsc);
+        return new HoodieCopyOnWriteTable<>(config, jsc, metaClient);
       case MERGE_ON_READ:
-        return new HoodieMergeOnReadTable<>(config, jsc);
+        return new HoodieMergeOnReadTable<>(config, jsc, metaClient);
       default:
         throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
     }
@@ -280,23 +291,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
       HoodieCompactionPlan compactionPlan);
 
   /**
-   * Generates list of files that are eligible for cleaning.
-   * 
-   * @param jsc Java Spark Context
-   * @return Cleaner Plan containing list of files to be deleted.
-   */
-  public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc);
-
-  /**
-   * Cleans the files listed in the cleaner plan associated with clean instant.
-   * 
-   * @param jsc Java Spark Context
-   * @param cleanInstant Clean Instant
-   * @param cleanerPlan Cleaner Plan
-   * @return list of Clean Stats
+   * Executes a new clean action.
+   *
+   * @return information on cleaned file slices
    */
-  public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant,
-      HoodieCleanerPlan cleanerPlan);
+  public abstract HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime);
 
   /**
    * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
new file mode 100644
index 0000000..e6ac2e9
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public abstract class BaseActionExecutor<R> {
+
+  protected final JavaSparkContext jsc;
+
+  protected final HoodieWriteConfig config;
+
+  protected final HoodieTable<?> table;
+
+  protected final String instantTime;
+
+  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
+    this.jsc = jsc;
+    this.config = config;
+    this.table = table;
+    this.instantTime = instantTime;
+  }
+
+  public abstract R execute();
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
new file mode 100644
index 0000000..48fe71d
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
+
+  /**
+   * Generates List of files to be cleaned.
+   *
+   * @param jsc JavaSparkContext
+   * @return Cleaner Plan
+   */
+  HoodieCleanerPlan requestClean(JavaSparkContext jsc) {
+    try {
+      CleanPlanner<?> planner = new CleanPlanner<>(table, config);
+      Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
+      List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
+
+      if (partitionsToClean.isEmpty()) {
+        LOG.info("Nothing to clean here. It is already clean");
+        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
+      }
+      LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
+      int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
+      LOG.info("Using cleanerParallelism: " + cleanerParallelism);
+
+      Map<String, List<String>> cleanOps = jsc
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
+          .collect().stream()
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      return new HoodieCleanerPlan(earliestInstant
+          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
+          config.getCleanerPolicy().name(), cleanOps, 1);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to schedule clean operation", e);
+    }
+  }
+
+  private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
+      HoodieTable table) {
+    return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
+      Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
+
+      FileSystem fs = table.getMetaClient().getFs();
+      Path basePath = new Path(table.getMetaClient().getBasePath());
+      while (iter.hasNext()) {
+        Tuple2<String, String> partitionDelFileTuple = iter.next();
+        String partitionPath = partitionDelFileTuple._1();
+        String delFileName = partitionDelFileTuple._2();
+        Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
+        String deletePathStr = deletePath.toString();
+        Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
+        if (!partitionCleanStatMap.containsKey(partitionPath)) {
+          partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
+        }
+        PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
+        partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
+        partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
+      }
+      return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
+          .collect(Collectors.toList()).iterator();
+    };
+  }
+
+  private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
+    Path deletePath = new Path(deletePathStr);
+    LOG.debug("Working on delete path :" + deletePath);
+    try {
+      boolean deleteResult = fs.delete(deletePath, false);
+      if (deleteResult) {
+        LOG.debug("Cleaned file at path :" + deletePath);
+      }
+      return deleteResult;
+    } catch (FileNotFoundException fio) {
+      // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
+      return false;
+    }
+  }
+
+  /**
+   * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
+   * skews in partitions to clean by making files to clean as the unit of task distribution.
+   *
+   * @throws IllegalArgumentException if unknown cleaning policy is provided
+   */
+  List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) {
+    int cleanerParallelism = Math.min(
+        (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+        config.getCleanerParallelism());
+    LOG.info("Using cleanerParallelism: " + cleanerParallelism);
+    List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
+        .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
+            .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
+            .collect(Collectors.toList()), cleanerParallelism)
+        .mapPartitionsToPair(deleteFilesFunc(table))
+        .reduceByKey(PartitionCleanStat::merge).collect();
+
+    Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream()
+        .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
+
+    // Return PartitionCleanStat for each partition passed.
+    return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
+      PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
+          ? partitionCleanStatsMap.get(partitionPath)
+          : new PartitionCleanStat(partitionPath);
+      HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
+      return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
+          .withEarliestCommitRetained(Option.ofNullable(
+              actionInstant != null
+                  ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
+                  actionInstant.getAction(), actionInstant.getTimestamp())
+                  : null))
+          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
+          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
+          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+          .build();
+    }).collect(Collectors.toList());
+  }
+
+  public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
+    super(jsc, config, table, instantTime);
+  }
+
+  /**
+   * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
+   *
+   * @param startCleanTime Cleaner Instant Time
+   * @return Cleaner Plan if generated
+   */
+  Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
+    final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
+    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
+        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+
+      final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
+      // Save to both aux and timeline folder
+      try {
+        table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+        LOG.info("Requesting Cleaning with instant time " + cleanInstant);
+      } catch (IOException e) {
+        LOG.error("Got exception when saving cleaner requested file", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return Option.of(cleanerPlan);
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Executes the Cleaner plan stored in the instant metadata.
+   */
+  void runPendingClean(HoodieTable<?> table, HoodieInstant cleanInstant) {
+    try {
+      HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
+      runClean(table, cleanInstant, cleanerPlan);
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  private HoodieCleanMetadata runClean(HoodieTable<?> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
+    ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
+        || cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
+
+    try {
+      final HoodieInstant inflightInstant;
+      final HoodieTimer timer = new HoodieTimer();
+      timer.startTimer();
+      if (cleanInstant.isRequested()) {
+        inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
+            TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+      } else {
+        inflightInstant = cleanInstant;
+      }
+
+      List<HoodieCleanStat> cleanStats = clean(jsc, cleanerPlan);
+      if (cleanStats.isEmpty()) {
+        return HoodieCleanMetadata.newBuilder().build();
+      }
+
+      table.getMetaClient().reloadActiveTimeline();
+      HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
+          inflightInstant.getTimestamp(),
+          Option.of(timer.endTimer()),
+          cleanStats
+      );
+
+      table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
+          TimelineMetadataUtils.serializeCleanMetadata(metadata));
+      LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
+      return metadata;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to clean up after commit", e);
+    }
+  }
+
+  @Override
+  public HoodieCleanMetadata execute() {
+    // If there are inflight(failed) or previously requested clean operation, first perform them
+    List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
+        .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
+    if (pendingCleanInstants.size() > 0) {
+      pendingCleanInstants.forEach(hoodieInstant -> {
+        LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
+        try {
+          runPendingClean(table, hoodieInstant);
+        } catch (Exception e) {
+          LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
+        }
+      });
+      table.getMetaClient().reloadActiveTimeline();
+    }
+
+    // Plan and execute a new clean action
+    Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
+    if (cleanerPlanOpt.isPresent()) {
+      table.getMetaClient().reloadActiveTimeline();
+      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
+      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+        return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
+      }
+    }
+    return null;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 00029cf..62203a3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table;
+package org.apache.hudi.table.action.clean;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.common.fs.FSUtils;
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 
+import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -56,12 +57,10 @@ import java.util.stream.Collectors;
  * 1) It provides sufficient time for existing queries running on older versions, to close
  * <p>
  * 2) It bounds the growth of the files in the file system
- * <p>
- * TODO: Should all cleaning be done based on {@link HoodieCommitMetadata}
  */
-public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializable {
+public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(CleanHelper.class);
+  private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
 
   private final SyncableFileSystemView fileSystemView;
   private final HoodieTimeline commitTimeline;
@@ -69,7 +68,7 @@ public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializab
   private HoodieTable<T> hoodieTable;
   private HoodieWriteConfig config;
 
-  public CleanHelper(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
+  public CleanPlanner(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
     this.hoodieTable = hoodieTable;
     this.fileSystemView = hoodieTable.getHoodieView();
     this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java
new file mode 100644
index 0000000..3493ad6
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+class PartitionCleanStat implements Serializable {
+
+  private final String partitionPath;
+  private final List<String> deletePathPatterns = new ArrayList<>();
+  private final List<String> successDeleteFiles = new ArrayList<>();
+  private final List<String> failedDeleteFiles = new ArrayList<>();
+
+  PartitionCleanStat(String partitionPath) {
+    this.partitionPath = partitionPath;
+  }
+
+  void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
+    if (deletedFileResult) {
+      successDeleteFiles.add(deletePathStr);
+    } else {
+      failedDeleteFiles.add(deletePathStr);
+    }
+  }
+
+  void addDeleteFilePatterns(String deletePathStr) {
+    deletePathPatterns.add(deletePathStr);
+  }
+
+  PartitionCleanStat merge(PartitionCleanStat other) {
+    if (!this.partitionPath.equals(other.partitionPath)) {
+      throw new RuntimeException(
+          String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath));
+    }
+    successDeleteFiles.addAll(other.successDeleteFiles);
+    deletePathPatterns.addAll(other.deletePathPatterns);
+    failedDeleteFiles.addAll(other.failedDeleteFiles);
+    return this;
+  }
+
+  public List<String> deletePathPatterns() {
+    return deletePathPatterns;
+  }
+
+  public List<String> successDeleteFiles() {
+    return successDeleteFiles;
+  }
+
+  public List<String> failedDeleteFiles() {
+    return failedDeleteFiles;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
index d2f2bb9..f71ab95 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
@@ -90,7 +90,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
     }
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     // Compacting is very similar to applying updates to existing file
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc, metaClient);
     List<CompactionOperation> operations = compactionPlan.getOperations().stream()
         .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
     LOG.info("Compactor compacting " + operations + " files");
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index 0b68261..4b3a9ed 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -99,7 +99,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       List<String> partitionPaths =
           FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+      HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
       final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
 
       List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
@@ -124,7 +124,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       assertNoWriteErrors(statuses);
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+      table = HoodieTable.create(metaClient, getConfig(), jsc);
       final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
 
       dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
@@ -143,7 +143,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       client.rollbackToSavepoint(savepoint.getTimestamp());
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+      table = HoodieTable.create(metaClient, getConfig(), jsc);
       final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
       dataFiles = partitionPaths.stream().flatMap(s -> {
         return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index b0dca5f..c5b52fa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -42,7 +42,6 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -85,10 +84,6 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
     cleanupResources();
   }
 
-  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
-  }
-
   protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
     return getHoodieWriteClient(cfg, false);
   }
@@ -161,7 +156,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
   }
 
   protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     ((SyncableFileSystemView) (table.getSliceView())).reset();
     return table;
   }
@@ -255,7 +250,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
       final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
       List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
       final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);
       JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
       return taggedRecords.collect();
     };
@@ -276,7 +271,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
       final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
       List<HoodieKey> records = keyGenFunction.apply(numRecords);
       final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);
       JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
           .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
       JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index f3b9c65..49311f7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -800,7 +800,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
 
       String instantTime = "000";
       client.startCommitWithTime(instantTime);
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 0cfad6c..1b93ae8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -31,11 +31,11 @@ import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.table.HoodieCopyOnWriteTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.junit.After;
 import org.junit.Assert;
@@ -67,7 +67,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
   public void testSchemaEvolutionOnUpdate() throws Exception {
     // Create a bunch of records with a old version of schema
     final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
-    final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    final HoodieTable<?> table = HoodieTable.create(config, jsc);
 
     final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
@@ -102,7 +102,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
     final WriteStatus insertResult = statuses.get(0);
     String fileId = insertResult.getFileId();
 
-    final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config2, jsc);
+    final HoodieTable table2 = HoodieTable.create(config2, jsc);
     Assert.assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
       // New content with values for the newly added field
       String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 8cde8b7..803f1be 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -142,7 +142,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
       // Test tagLocation without any entries in index
       JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
@@ -162,7 +162,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
       writeClient.commit(newCommitTime, writeStatues);
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      hoodieTable = HoodieTable.create(metaClient, config, jsc);
       javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
       assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
       assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
@@ -183,7 +183,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
     writeClient.startCommitWithTime(newCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
     JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
@@ -201,7 +201,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     writeClient.commit(newCommitTime, writeStatues);
     // Now tagLocation for these records, hbaseIndex should tag them correctly
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    hoodieTable = HoodieTable.create(metaClient, config, jsc);
     JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
     assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
     assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
@@ -227,7 +227,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
 
     // commit this upsert
     writeClient.commit(newCommitTime, writeStatues);
-    HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
     // Now tagLocation for these records, hbaseIndex should tag them
     JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
     assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
@@ -242,7 +242,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Rollback the last commit
     writeClient.rollback(newCommitTime);
 
-    hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    hoodieTable = HoodieTable.create(metaClient, config, jsc);
     // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
     // back commit
     javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
@@ -271,7 +271,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
     // Insert 250 records
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -296,7 +296,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
     // Insert 200 records
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -408,7 +408,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
       // Test tagLocation without any entries in index
       JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
@@ -428,7 +428,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
       writeClient.commit(newCommitTime, writeStatues);
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      hoodieTable = HoodieTable.create(metaClient, config, jsc);
       javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
       assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
       assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
@@ -448,7 +448,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
       // Test tagLocation without any entries in index
       JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
@@ -462,7 +462,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
 
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      hoodieTable = HoodieTable.create(metaClient, config, jsc);
       javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
       assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
       assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
@@ -499,7 +499,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
 
     assertTrue(index.canIndexLogFiles());
     try {
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
       index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
       fail("HbaseIndex supports fetchRecordLocation");
     } catch (UnsupportedOperationException ex) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 1339b44..ce29a92 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -160,7 +160,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
     // Still 0, as no valid commit
     assertEquals(filesList.size(), 0);
@@ -170,7 +170,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
     new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
 
-    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    table = HoodieTable.create(metaClient, config, jsc);
     filesList = index.loadInvolvedFiles(partitions, jsc, table);
     assertEquals(filesList.size(), 4);
 
@@ -284,7 +284,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     // Also create the metadata and config
     HoodieWriteConfig config = makeConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -324,7 +324,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     // Also create the metadata and config
     HoodieWriteConfig config = makeConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -345,7 +345,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    table = HoodieTable.create(metaClient, config, jsc);
 
     taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
 
@@ -394,7 +394,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     // Also create the metadata and config
     HoodieWriteConfig config = makeConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -416,7 +416,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    table = HoodieTable.create(metaClient, config, jsc);
     taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
 
     // Check results
@@ -465,7 +465,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
     HoodieWriteConfig config = makeConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
     JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 67832d2..1daf336 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -129,7 +129,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     // partitions will NOT be respected by this loadInvolvedFiles(...) call
     List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
     // Still 0, as no valid commit
@@ -140,7 +140,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
     new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
 
-    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    table = HoodieTable.create(metaClient, config, jsc);
     filesList = index.loadInvolvedFiles(partitions, jsc, table);
     assertEquals(filesList.size(), 4);
 
@@ -259,7 +259,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
 
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // Add some commits
     new File(basePath + "/.hoodie").mkdirs();
@@ -344,7 +344,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
         .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false);
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     // Add some commits
     new File(basePath + "/.hoodie").mkdirs();
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
index a1492dd..50e8467 100755
--- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
@@ -44,7 +44,7 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase {
     final String instantTime = "100";
     final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
     final HoodieWriteConfig cfg = getConfig();
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
     SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
     HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
         parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index a23d0e0..d8b315d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieCleanClient;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.client.WriteStatus;
@@ -104,7 +103,7 @@ public class TestCleaner extends TestHoodieClientBase {
    * @param insertFn Insertion API for testing
    * @throws Exception in case of error
    */
-  private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
+  private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
 
@@ -126,7 +125,7 @@ public class TestCleaner extends TestHoodieClientBase {
     HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
     assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
     // Should have 100 records in table (check using Index), all in locations marked at commit
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, client.getConfig(), jsc);
+    HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc);
 
     assertFalse(table.getCompletedCommitsTimeline().empty());
     String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
@@ -137,7 +136,6 @@ public class TestCleaner extends TestHoodieClientBase {
     HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
     List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
     checkTaggedRecords(taggedRecords, newCommitTime);
-    return newCommitTime;
   }
 
   /**
@@ -207,7 +205,7 @@ public class TestCleaner extends TestHoodieClientBase {
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+      HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         TableFileSystemView fsView = table.getFileSystemView();
         Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> {
@@ -244,7 +242,7 @@ public class TestCleaner extends TestHoodieClientBase {
           assertNoWriteErrors(statuses);
 
           metaClient = HoodieTableMetaClient.reload(metaClient);
-          table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+          table = HoodieTable.create(metaClient, getConfig(), jsc);
           HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
 
           TableFileSystemView fsView = table.getFileSystemView();
@@ -376,7 +374,7 @@ public class TestCleaner extends TestHoodieClientBase {
         assertNoWriteErrors(statuses);
 
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+        HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
         HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
         Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
         Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
@@ -412,7 +410,7 @@ public class TestCleaner extends TestHoodieClientBase {
    *
    * @param config HoodieWriteConfig
    */
-  private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) {
+  private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException {
     return runCleaner(config, false);
   }
 
@@ -421,9 +419,8 @@ public class TestCleaner extends TestHoodieClientBase {
    *
    * @param config HoodieWriteConfig
    */
-  private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) {
-    HoodieCleanClient writeClient = getHoodieCleanClient(config);
-
+  private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException {
+    HoodieWriteClient<?> writeClient = getHoodieWriteClient(config);
     String cleanInstantTs = getNextInstant();
     HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
 
@@ -432,18 +429,16 @@ public class TestCleaner extends TestHoodieClientBase {
     }
 
     if (simulateRetryFailure) {
-      metaClient.reloadActiveTimeline()
-          .revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs));
-      final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-      HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table,
-          HoodieTimeline.getCleanInflightInstant(cleanInstantTs));
+      HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
+      metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
+      HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant());
       Assert.assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain());
       Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
-      Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(),
-          cleanMetadata2.getPartitionMetadata().keySet());
+      Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet());
+      final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
       cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
         HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k);
-        HoodieCleanPartitionMetadata p2 = cleanMetadata2.getPartitionMetadata().get(k);
+        HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k);
         Assert.assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
         Assert.assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles());
         Assert.assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
@@ -636,8 +631,11 @@ public class TestCleaner extends TestHoodieClientBase {
             Collections.singletonList(fileName2)));
     newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));
 
-    HoodieCleanMetadata metadata =
-        CleanerUtils.convertCleanMetadata(metaClient, instantTime, Option.of(0L), Arrays.asList(cleanStat1, cleanStat2));
+    HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
+        instantTime,
+        Option.of(0L),
+        Arrays.asList(cleanStat1, cleanStat2)
+    );
     metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
 
     // NOw upgrade and check
@@ -896,7 +894,7 @@ public class TestCleaner extends TestHoodieClientBase {
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
         HoodieTimeline.COMMIT_ACTION, "000"));
     table.getActiveTimeline().transitionRequestedToInflight(
@@ -988,7 +986,7 @@ public class TestCleaner extends TestHoodieClientBase {
    * Test clean previous corrupted cleanFiles.
    */
   @Test
-  public void testCleanPreviousCorruptedCleanFiles() {
+  public void testCleanPreviousCorruptedCleanFiles() throws IOException {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder()
             .withPath(basePath).withAssumeDatePartitioning(true)
@@ -1042,7 +1040,7 @@ public class TestCleaner extends TestHoodieClientBase {
         if (j == i && j <= maxNumFileIdsForCompaction) {
           expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
           metaClient = HoodieTableMetaClient.reload(metaClient);
-          HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+          HoodieTable table = HoodieTable.create(metaClient, config, jsc);
           FileSlice slice =
               table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
                   .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
@@ -1084,7 +1082,7 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // Test for safety
     final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
-    final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
 
     expFileIdToPendingCompaction.forEach((fileId, value) -> {
       String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index f670b86..11e94e4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -98,7 +98,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     String instantTime = HoodieTestUtils.makeNewCommitTime();
     HoodieWriteConfig config = makeHoodieClientConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
     Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
       HoodieRecord record = mock(HoodieRecord.class);
@@ -132,7 +132,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
 
     // Get some records belong to the same partition (2016/01/31)
     String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
@@ -207,7 +207,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     Thread.sleep(1000);
     String newCommitTime = HoodieTestUtils.makeNewCommitTime();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
+    final HoodieCopyOnWriteTable newTable = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
     List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return newTable.handleUpdate(newCommitTime, updatedRecord1.getPartitionPath(),
           updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator());
@@ -274,7 +274,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
 
     // Get some records belong to the same partition (2016/01/31)
     String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
@@ -309,8 +309,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   public void testInsertRecords() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfig();
     String instantTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
     metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
 
     // Case 1:
     // 10 records for partition 1, 1 record for partition 2.
@@ -363,7 +363,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
         .limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
     String instantTime = HoodieTestUtils.makeNewCommitTime();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
 
     List<HoodieRecord> records = new ArrayList<>();
     // Approx 1150 records are written for block size of 64KB
@@ -400,7 +400,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     HoodieClientTestUtils.fakeCommitFile(basePath, "001");
     HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
@@ -472,7 +472,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder()
             .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
     String instantTime = "000";
     // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
     final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
@@ -483,9 +483,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     WriteStatus writeStatus = ws.get(0).get(0);
     String fileId = writeStatus.getFileId();
     metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
-    final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
-    final List<HoodieRecord> updates =
-            dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts);
+    final List<HoodieRecord> updates = dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts);
 
     String partitionPath = updates.get(0).getPartitionPath();
     long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index b48ad3f..2c2722c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -120,7 +120,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
@@ -228,7 +228,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
@@ -337,7 +337,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       client.rollback(newCommitTime);
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
       HoodieTableFileSystemView roView =
           new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
@@ -369,7 +369,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
@@ -450,7 +450,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
             .filter(file -> file.getPath().getName().contains(commitTime2)).count(), 0);
 
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+        hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
         roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
         dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
@@ -516,7 +516,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
@@ -679,7 +679,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
@@ -769,7 +769,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
       // Verify that all data file has one log file
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, config, jsc);
       // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
       ((SyncableFileSystemView) (table.getSliceView())).reset();
 
@@ -793,7 +793,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
       // Verify that recently written compacted data file has no log file
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      table = HoodieTable.create(metaClient, config, jsc);
       HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
 
       assertTrue("Compaction commit should be > than last insert", HoodieTimeline
@@ -826,7 +826,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       writeClient.commit(newCommitTime, statuses);
 
       HoodieTable table =
-          HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
+          HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
       SliceView tableRTFileSystemView = table.getSliceView();
 
       long numLogFiles = 0;
@@ -902,7 +902,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       writeClient.rollback(newCommitTime);
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, config, jsc);
       SliceView tableRTFileSystemView = table.getSliceView();
 
       long numLogFiles = 0;
@@ -939,7 +939,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       statuses.collect();
 
       HoodieTable table =
-          HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
+          HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
       SliceView tableRTFileSystemView = table.getSliceView();
 
       long numLogFiles = 0;
@@ -960,7 +960,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
       // Trigger a rollback of compaction
       writeClient.rollback(newCommitTime);
-      table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
+      table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
       tableRTFileSystemView = table.getSliceView();
       ((SyncableFileSystemView) tableRTFileSystemView).reset();
       Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
@@ -981,7 +981,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
 
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
@@ -1002,10 +1002,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue("Commit should succeed", client.commit(instantTime, statuses));
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
-          table.getActiveTimeline()
-              .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
+          table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
       HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
           metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
@@ -1027,7 +1026,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue("Commit should succeed", client.commit(instantTime, statuses));
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
@@ -1051,7 +1050,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       client.rollback(instantTime);
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
@@ -1078,7 +1077,6 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
    */
   @Test
   public void testRollingStatsWithSmallFileHandling() throws Exception {
-
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
@@ -1095,7 +1093,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue("Commit should succeed", client.commit(instantTime, statuses));
 
       // Read from commit file
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable table = HoodieTable.create(cfg, jsc);
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
@@ -1124,7 +1122,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue("Commit should succeed", client.commit(instantTime, statuses));
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
@@ -1155,7 +1153,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       client.commitCompaction(instantTime, statuses, Option.empty());
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
@@ -1183,7 +1181,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue("Commit should succeed", client.commit(instantTime, statuses));
 
       // Read from commit file
-      table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      table = HoodieTable.create(cfg, jsc);
       metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
@@ -1229,7 +1227,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
       assertTrue(deltaCommit.isPresent());
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index 2c706ba..b5151b3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -119,7 +119,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
 
       // Reload and rollback inflight compaction
       metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
-      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
       // hoodieTable.rollback(jsc,
       //    new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index b335c17..d77d3f8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -100,7 +100,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   @Test(expected = HoodieNotSupportedException.class)
   public void testCompactionOnCopyOnWriteFail() throws Exception {
     metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
+    HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
     String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
     table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
   }
@@ -109,7 +109,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   public void testCompactionEmpty() throws Exception {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
 
       String newCommitTime = writeClient.startCommit();
@@ -138,7 +138,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
       // Update all the 100 records
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieTable table = HoodieTable.create(metaClient, config, jsc);
 
       newCommitTime = "101";
       writeClient.startCommitWithTime(newCommitTime);
@@ -154,7 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
       // Verify that all data file has one log file
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      table = HoodieTable.create(metaClient, config, jsc);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> groupedLogFiles =
             table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
@@ -165,7 +165,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
       // Do a compaction
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      table = HoodieTable.create(metaClient, config, jsc);
 
       String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
       JavaRDD<WriteStatus> result =
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 3d76e50..96ac4ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -39,8 +39,9 @@ public class CleanerUtils {
   public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
 
-  public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient metaClient,
-      String startCleanTime, Option<Long> durationInMs, List<HoodieCleanStat> cleanStats) {
+  public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
+                                                         Option<Long> durationInMs,
+                                                         List<HoodieCleanStat> cleanStats) {
     Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new HashMap<>();
     int totalDeleted = 0;
     String earliestCommitToRetain = null;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
index 4e8ff74..86b2303 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
@@ -328,7 +328,7 @@ public class HoodieTestUtils {
       // Create the clean metadata
 
       HoodieCleanMetadata cleanMetadata =
-          CleanerUtils.convertCleanMetadata(metaClient, instantTime, Option.of(0L), Collections.singletonList(cleanStats));
+          CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
       // Write empty clean metadata
       os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get());
     }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 7a273a2..e6c45a2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -414,8 +414,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
 
     HoodieInstant cleanInflightInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant);
     metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant);
-    HoodieCleanMetadata cleanMetadata = CleanerUtils
-        .convertCleanMetadata(metaClient, cleanInstant, Option.empty(), cleanStats);
+    HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats);
     metaClient.getActiveTimeline().saveAsComplete(cleanInflightInstant,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
   }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index a9e7389..f0891e3 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -54,7 +54,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
   }
   // TODO : Figure out a valid HoodieWriteConfig
-  private val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
+  private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
     sqlContext.sparkContext)
   val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
   if (commitTimeline.empty()) {