You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/26 01:26:51 UTC

[incubator-hudi] branch master updated: [HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ca0b5  [HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)
19ca0b5 is described below

commit 19ca0b56296bf89c611406bb86c3626f87d562ee
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Sat Apr 25 18:26:44 2020 -0700

    [HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)
    
    - Savepoint and compaction classes moved to table.action.* packages
     - HoodieWriteClient#savepoint(...) returns void
     - Renamed HoodieCommitArchiveLog -> HoodieTimelineArchiveLog
     - Fixed tests to take into account the additional validation done
     - Moved helper code into CompactHelpers and SavepointHelpers
---
 .../hudi/cli/commands/CompactionCommand.java       |   2 +-
 .../hudi/cli/commands/SavepointsCommand.java       |   8 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |   7 +-
 .../java/org/apache/hudi/cli/utils/CommitUtil.java |   2 +-
 .../cli/commands/TestArchivedCommitsCommand.java   |   4 +-
 .../hudi/cli/common/HoodieTestCommitUtilities.java |   4 +-
 .../hudi/client/AbstractHoodieWriteClient.java     |  38 +-
 .../apache/hudi/client/CompactionAdminClient.java  |  12 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  | 484 ++++-----------------
 .../apache/hudi/config/HoodieCompactionConfig.java |   4 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   2 +-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  18 +-
 .../apache/hudi/table/HoodieMergeOnReadTable.java  |  55 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  95 ++--
 ...chiveLog.java => HoodieTimelineArchiveLog.java} |   8 +-
 .../action/{commit => }/HoodieWriteMetadata.java   |   2 +-
 .../hudi/table/action/clean/CleanPlanner.java      |  28 +-
 .../action/commit/BaseCommitActionExecutor.java    |   4 +-
 .../commit/BulkInsertCommitActionExecutor.java     |   1 +
 .../hudi/table/action/commit/BulkInsertHelper.java |   1 +
 .../BulkInsertPreppedCommitActionExecutor.java     |   1 +
 .../action/commit/DeleteCommitActionExecutor.java  |   1 +
 .../hudi/table/action/commit/DeleteHelper.java     |   5 +-
 .../action/commit/InsertCommitActionExecutor.java  |   1 +
 .../commit/InsertPreppedCommitActionExecutor.java  |   1 +
 .../action/commit/UpsertCommitActionExecutor.java  |   1 +
 .../commit/UpsertPreppedCommitActionExecutor.java  |   1 +
 .../hudi/table/action/commit/WriteHelper.java      |   7 +-
 .../hudi/table/action/compact/CompactHelpers.java  |  70 +++
 .../{ => action}/compact/HoodieCompactor.java      |   2 +-
 .../compact/HoodieMergeOnReadTableCompactor.java   |   5 +-
 .../{ => action}/compact/OperationResult.java      |   2 +-
 .../compact/RunCompactionActionExecutor.java       |  93 ++++
 .../compact/ScheduleCompactionActionExecutor.java  | 121 ++++++
 .../strategy/BoundedIOCompactionStrategy.java      |   2 +-
 .../BoundedPartitionAwareCompactionStrategy.java   |   2 +-
 .../compact/strategy/CompactionStrategy.java       |   4 +-
 .../strategy/DayBasedCompactionStrategy.java       |   2 +-
 .../LogFileSizeBasedCompactionStrategy.java        |   2 +-
 .../strategy/UnBoundedCompactionStrategy.java      |   2 +-
 .../UnBoundedPartitionAwareCompactionStrategy.java |   2 +-
 .../BulkInsertDeltaCommitActionExecutor.java       |   2 +-
 ...BulkInsertPreppedDeltaCommitActionExecutor.java |   2 +-
 .../DeleteDeltaCommitActionExecutor.java           |   2 +-
 .../InsertDeltaCommitActionExecutor.java           |   2 +-
 .../InsertPreppedDeltaCommitActionExecutor.java    |   2 +-
 .../UpsertDeltaCommitActionExecutor.java           |   2 +-
 .../UpsertPreppedDeltaCommitActionExecutor.java    |   2 +-
 .../MergeOnReadRollbackActionExecutor.java         |   2 +-
 .../action/savepoint/SavepointActionExecutor.java  | 115 +++++
 .../table/action/savepoint/SavepointHelpers.java   |  71 +++
 .../org/apache/hudi/client/TestClientRollback.java |   2 +-
 .../hudi/client/TestCompactionAdminClient.java     |   2 +-
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java |  18 +-
 .../{ => action}/compact/TestAsyncCompaction.java  |   2 +-
 .../{ => action}/compact/TestHoodieCompactor.java  |  42 +-
 .../strategy/TestHoodieCompactionStrategy.java     |   2 +-
 .../apache/hudi/common/model/HoodieTestUtils.java  |  14 +
 58 files changed, 789 insertions(+), 601 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index a4c70da..9b55fe2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -42,7 +42,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
+import org.apache.hudi.table.action.compact.OperationResult;
 import org.apache.hudi.utilities.UtilHelpers;
 
 import org.apache.hadoop.fs.FSDataInputStream;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index dac0021..b5bc349 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -79,12 +80,13 @@ public class SavepointsCommand implements CommandMarker {
     String result;
     try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint")) {
       HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath());
-      if (client.savepoint(commitTime, user, comments)) {
+      try {
+        client.savepoint(commitTime, user, comments);
         // Refresh the current
         refreshMetaClient();
         result = String.format("The commit \"%s\" has been savepointed.", commitTime);
-      } else {
-        result = String.format("Failed: Could not savepoint commit \"%s\".", commitTime);
+      } catch (HoodieSavepointException se) {
+        result = String.format("Failed: Could not create savepoint \"%s\".", commitTime);
       }
     }
     return result;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 3534cc5..5aa2255 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.compact.strategy.UnBoundedCompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 import org.apache.hudi.utilities.HDFSParquetImporter;
 import org.apache.hudi.utilities.HDFSParquetImporter.Config;
 import org.apache.hudi.utilities.HoodieCleaner;
@@ -283,10 +283,11 @@ public class SparkMain {
 
   private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
     HoodieWriteClient client = createHoodieClient(jsc, basePath);
-    if (client.restoreToSavepoint(savepointTime)) {
+    try {
+      client.restoreToSavepoint(savepointTime);
       LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
       return 0;
-    } else {
+    } catch (Exception e) {
       LOG.info(String.format("The commit \"%s\" failed to roll back.", savepointTime));
       return -1;
     }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index 1cbe2e6..5a1c457 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -39,7 +39,7 @@ public class CommitUtil {
 
   public static long countNewRecords(HoodieTableMetaClient target, List<String> commitsToCatchup) throws IOException {
     long totalNew = 0;
-    HoodieTimeline timeline = target.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = target.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
     for (String commit : commitsToCatchup) {
       HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes(
           timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(),
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 56dc5e9..b2261ef 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
 
 import org.junit.After;
 import org.junit.Before;
@@ -91,7 +91,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
     metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
 
     // archive
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     archiveLog.archiveIfRequired(jsc);
   }
 
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
index bfd0f0f..75a02c8 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
@@ -20,7 +20,7 @@ package org.apache.hudi.cli.common;
 
 import org.apache.hudi.avro.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -36,7 +36,7 @@ public class HoodieTestCommitUtilities {
    */
   public static org.apache.hudi.avro.model.HoodieCommitMetadata convertAndOrderCommitMetadata(
       HoodieCommitMetadata hoodieCommitMetadata) {
-    return orderCommitMetadata(HoodieCommitArchiveLog.convertCommitMetadata(hoodieCommitMetadata));
+    return orderCommitMetadata(HoodieTimelineArchiveLog.convertCommitMetadata(hoodieCommitMetadata));
   }
 
   /**
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index d712d57..8d730e2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
 
 import com.codahale.metrics.Timer;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.SparkConfigUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRollingStat;
@@ -57,7 +56,6 @@ import java.util.Map;
 public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
 
   private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
-  private static final String UPDATE_STR = "update";
 
   private final transient HoodieMetrics metrics;
   private final transient HoodieIndex<T> index;
@@ -96,32 +94,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
   }
 
-  protected JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> table,
-      String instantTime) {
-    // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
-    // RDD actions that are performed after updating the index.
-    writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
-    Timer.Context indexTimer = metrics.getIndexCtx();
-    // Update the index back
-    JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
-    metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
-    // Trigger the insert and collect statuses
-    commitOnAutoCommit(instantTime, statuses, table.getMetaClient().getCommitActionType());
-    return statuses;
-  }
-
-  protected void commitOnAutoCommit(String instantTime, JavaRDD<WriteStatus> resultRDD, String actionType) {
-    if (config.shouldAutoCommit()) {
-      LOG.info("Auto commit enabled: Committing " + instantTime);
-      boolean commitResult = commit(instantTime, resultRDD, Option.empty(), actionType);
-      if (!commitResult) {
-        throw new HoodieCommitException("Failed to commit " + instantTime);
-      }
-    } else {
-      LOG.info("Auto commit disabled for " + instantTime);
-    }
-  }
-
   private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata, String actionType) {
 
@@ -131,7 +103,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
 
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-
     List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
 
     updateMetadataAndRollingStats(actionType, metadata, stats);
@@ -149,10 +120,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     try {
       activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-
       postCommit(metadata, instantTime, extraMetadata);
       emitCommitMetrics(instantTime, metadata, actionType);
-
       LOG.info("Committed " + instantTime);
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
@@ -182,8 +151,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
    * @param instantTime   Instant Time
    * @param extraMetadata Additional Metadata passed by user
    */
-  protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata);
+  protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
 
   /**
    * Finalize Write operation.
@@ -260,7 +228,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
   protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     if (operationType == WriteOperationType.DELETE) {
-      setWriteSchemaFromLastInstant(metaClient);
+      setWriteSchemaForDeletes(metaClient);
     }
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable table = HoodieTable.create(metaClient, config, jsc);
@@ -275,7 +243,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
   /**
    * Sets write schema from last instant since deletes may not have schema set in the config.
    */
-  private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
+  private void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
     try {
       HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
       Option<HoodieInstant> lastInstant =
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 9f9ce12..627348c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -18,9 +18,10 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
@@ -41,10 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.compact.OperationResult;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -71,10 +69,6 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
   }
 
-  public CompactionAdminClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineServer) {
-    super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), timelineServer);
-  }
-
   /**
    * Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding
    * compaction operations.
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index c7de8df..5ae7ca0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -19,64 +19,48 @@
 package org.apache.hudi.client;
 
 import com.codahale.metrics.Timer;
-import org.apache.avro.Schema;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.SparkConfigUtils;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
-import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.action.savepoint.SavepointHelpers;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -171,9 +155,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
-    validateSchema(table, true);
+    table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
+    HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
     }
@@ -191,7 +175,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
-    validateSchema(table, true);
+    table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
     HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
@@ -209,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
-    validateSchema(table, false);
+    table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT);
     HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
     return postWrite(result, instantTime, table);
@@ -228,7 +212,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
-    validateSchema(table, false);
+    table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
     HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
@@ -267,6 +251,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime,
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
+    table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
     HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
@@ -291,6 +276,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime,
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
+    table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
     HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
@@ -344,12 +330,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       // Do an inline compaction if enabled
       if (config.isInlineCompaction()) {
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
-        forceCompact(extraMetadata);
+        inlineCompact(extraMetadata);
       } else {
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
       }
       // We cannot have unbounded commit files. Archive commits if we have to archive
-      HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
+      HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
       archiveLog.archiveIfRequired(jsc);
       if (config.isAutoClean()) {
         // Call clean to cleanup if there is anything to cleanup after the commit,
@@ -364,35 +350,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   /**
-   * Savepoint a specific commit. Latest version of data files as of the passed in commitTime will be referenced in the
-   * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
-   * <p>
-   * This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
-   * deleted.
-   * <p>
-   * Savepoint should be on a commit that could not have been cleaned.
+   * Create a savepoint based on the latest commit action on the timeline.
    *
    * @param user - User creating the savepoint
    * @param comment - Comment for the savepoint
-   * @return true if the savepoint was created successfully
    */
-  public boolean savepoint(String user, String comment) {
+  public void savepoint(String user, String comment) {
     HoodieTable<T> table = HoodieTable.create(config, jsc);
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
     }
-    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
-    }
 
     String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
     LOG.info("Savepointing latest commit " + latestCommit);
-    return savepoint(latestCommit, user, comment);
+    savepoint(latestCommit, user, comment);
   }
 
   /**
-   * Savepoint a specific commit. Latest version of data files as of the passed in instantTime will be referenced in the
-   * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
+   * Savepoint a specific commit instant time. Latest version of data files as of the passed in instantTime
+   * will be referenced in the savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
    * <p>
    * This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
    * deleted.
@@ -402,60 +378,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param instantTime - commit that should be savepointed
    * @param user - User creating the savepoint
    * @param comment - Comment for the savepoint
-   * @return true if the savepoint was created successfully
    */
-  public boolean savepoint(String instantTime, String user, String comment) {
+  public void savepoint(String instantTime, String user, String comment) {
     HoodieTable<T> table = HoodieTable.create(config, jsc);
-    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
-    }
-    Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
-
-    HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
-    if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
-      throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
-    }
-
-    try {
-      // Check the last commit that was not cleaned and check if savepoint time is > that commit
-      String lastCommitRetained;
-      if (cleanInstant.isPresent()) {
-        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
-            .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
-        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
-      } else {
-        lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
-      }
-
-      // Cannot allow savepoint time on a commit that could have been cleaned
-      ValidationUtils.checkArgument(
-          HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
-          "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
-
-      Map<String, List<String>> latestFilesMap = jsc
-          .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
-              config.shouldAssumeDatePartitioning()))
-          .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
-            // Scan all partitions files with this commit time
-            LOG.info("Collecting latest files in partition path " + partitionPath);
-            BaseFileOnlyView view = table.getBaseFileOnlyView();
-            List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-                .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
-            return new Tuple2<>(partitionPath, latestFiles);
-          }).collectAsMap();
-
-      HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
-      // Nothing to save in the savepoint
-      table.getActiveTimeline().createNewInstant(
-          new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
-      table.getActiveTimeline()
-          .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
-              TimelineMetadataUtils.serializeSavepointMetadata(metadata));
-      LOG.info("Savepoint " + instantTime + " created");
-      return true;
-    } catch (IOException e) {
-      throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
-    }
+    table.savepoint(jsc, instantTime, user, comment);
   }
 
   /**
@@ -467,88 +393,24 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public void deleteSavepoint(String savepointTime) {
     HoodieTable<T> table = HoodieTable.create(config, jsc);
-    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
-    }
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-
-    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
-    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
-    if (!isSavepointPresent) {
-      LOG.warn("No savepoint present " + savepointTime);
-      return;
-    }
-
-    activeTimeline.revertToInflight(savePoint);
-    activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
-    LOG.info("Savepoint " + savepointTime + " deleted");
+    SavepointHelpers.deleteSavepoint(table, savepointTime);
   }
 
   /**
-   * Delete a compaction request that is pending.
+   * Restore the data to the savepoint.
    *
-   * NOTE - This is an Admin operation. With async compaction, this is expected to be called with async compaction and
-   * write shutdown. Otherwise, async compactor could fail with errors
-   *
-   * @param compactionTime - delete the compaction time
-   */
-  private void deleteRequestedCompaction(String compactionTime) {
-    HoodieTable<T> table = HoodieTable.create(config, jsc);
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieInstant compactionRequestedInstant =
-        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
-    boolean isCompactionInstantInRequestedState =
-        table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant);
-    HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
-    if (commitTimeline.empty() && !commitTimeline.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) {
-      throw new HoodieRollbackException(
-          "Found commits after time :" + compactionTime + ", please rollback greater commits first");
-    }
-    if (isCompactionInstantInRequestedState) {
-      activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
-    } else {
-      throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
-    }
-    LOG.info("Compaction " + compactionTime + " deleted");
-  }
-
-  /**
-   * Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries
-   * accessing the files will mostly fail. This should be done during a downtime.
+   * WARNING: This rolls back recent commits and deleted data files and also pending compactions after savepoint time.
+   * Queries accessing the files will mostly fail. This is expected to be a manual operation and no concurrent write or
+   * compaction is expected to be running
    *
    * @param savepointTime - savepoint time to rollback to
-   * @return true if the savepoint was rollecback to successfully
+   * @return true if the savepoint was restored to successfully
    */
-  public boolean restoreToSavepoint(String savepointTime) {
+  public void restoreToSavepoint(String savepointTime) {
     HoodieTable<T> table = HoodieTable.create(config, jsc);
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-
-    // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
-    // to be running. Rollback to savepoint also removes any pending compaction actions that are generated after
-    // savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing
-    // file-slices that will be rolled-back as part of this operation
-    HoodieTimeline instantTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
-
-    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
-    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
-    if (!isSavepointPresent) {
-      throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
-    }
-
-    List<String> commitsToRollback = instantTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
-    LOG.info("Rolling back commits " + commitsToRollback);
-
+    SavepointHelpers.validateSavepointPresence(table, savepointTime);
     restoreToInstant(savepointTime);
-
-    // Make sure the rollback was successful
-    Option<HoodieInstant> lastInstant =
-        activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
-    ValidationUtils.checkArgument(lastInstant.isPresent());
-    ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
-        savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
-            + lastInstant.get().getTimestamp());
-    return true;
+    SavepointHelpers.validateSavepointRestore(table, savepointTime);
   }
 
   /**
@@ -574,7 +436,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         }
         return true;
       } else {
-        LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
+        LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
         return false;
       }
     } catch (Exception e) {
@@ -626,9 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
     LOG.info("Cleaner started");
     final Timer.Context context = metrics.getCleanCtx();
-
     HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
-
     if (context != null) {
       long durationMs = metrics.getDurationInMs(context.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -636,7 +496,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
           + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
           + " cleanerElaspsedMs" + durationMs);
     }
-
     return metadata;
   }
 
@@ -692,11 +551,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    *
    * @param extraMetadata Extra Metadata to be stored
    */
-  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    LOG.info("Generate a new instant time " + instantTime);
-    boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
-    return notEmpty ? Option.of(instantTime) : Option.empty();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
   }
 
   /**
@@ -705,35 +562,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param instantTime Compaction Instant Time
    * @param extraMetadata Extra Metadata to be stored
    */
-  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata)
-      throws IOException {
-    HoodieTableMetaClient metaClient = createMetaClient(true);
-    // if there are inflight writes, their instantTime must not be less than that of compaction instant time
-    metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
-      ValidationUtils.checkArgument(
-          HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
-          "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
-              + ", Compaction scheduled at " + instantTime);
-    });
-    // Committed and pending compaction instants should have strictly lower timestamps
-    List<HoodieInstant> conflictingInstants = metaClient
-        .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline
-            .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
-        .collect(Collectors.toList());
-    ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
-        "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
-            + conflictingInstants);
-    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
-    HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
-    if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
-      extraMetadata.ifPresent(workload::setExtraMetadata);
-      HoodieInstant compactionInstant =
-          new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
-      metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
-          TimelineMetadataUtils.serializeCompactionPlan(workload));
-      return true;
-    }
-    return false;
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    LOG.info("Scheduling compaction at instant time :" + instantTime);
+    Option<HoodieCompactionPlan> plan = HoodieTable.create(config, jsc)
+        .scheduleCompaction(jsc, instantTime, extraMetadata);
+    return plan.isPresent();
   }
 
   /**
@@ -742,7 +575,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param compactionInstantTime Compaction Instant Time
    * @return RDD of WriteStatus to inspect errors and counts
    */
-  public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
+  public JavaRDD<WriteStatus> compact(String compactionInstantTime) {
     return compact(compactionInstantTime, config.shouldAutoCommit());
   }
 
@@ -754,24 +587,47 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param extraMetadata Extra Metadata to be stored
    */
   public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) throws IOException {
-    HoodieTableMetaClient metaClient = createMetaClient(true);
-    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
-    HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
-        timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
-    // Merge extra meta-data passed by user with the one already in inflight compaction
-    Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
-      Map<String, String> merged = new HashMap<>();
-      Map<String, String> extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata();
-      if (extraMetaDataFromInstantFile != null) {
-        merged.putAll(extraMetaDataFromInstantFile);
+                               Option<Map<String, String>> extraMetadata) throws IOException {
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
+    HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
+        table, compactionInstantTime, writeStatuses, config.getSchema());
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
+  }
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, HoodieTable<T> table,
+                                    String compactionCommitTime) {
+
+    List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
+    finalizeWrite(table, compactionCommitTime, writeStats);
+    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
+    CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);
+
+    if (compactionTimer != null) {
+      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+      try {
+        metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
+            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + compactionCommitTime, e);
       }
-      // Overwrite/Merge with the user-passed meta-data
-      merged.putAll(m);
-      return Option.of(merged);
-    }).orElseGet(() -> Option.ofNullable(compactionPlan.getExtraMetadata()));
-    commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData);
+    }
+    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+  }
+
+  /**
+   * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
+   *
+   * @param inflightInstant Inflight Compaction Instant
+   * @param table Hoodie Table
+   */
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
+    table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
+    table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
   }
 
   /**
@@ -793,192 +649,32 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @param compactionInstantTime Compaction Instant Time
    * @return RDD of Write Status
    */
-  private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
-    // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTableMetaClient metaClient = createMetaClient(true);
-    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
-    HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+  private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
+    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
       rollbackInflightCompaction(inflightInstant, table);
-      metaClient.reloadActiveTimeline();
-      pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+      table.getMetaClient().reloadActiveTimeline();
     }
-
-    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(instant)) {
-      return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit);
-    } else {
-      throw new IllegalStateException(
-          "No Compaction request available at " + compactionInstantTime + " to run compaction");
-    }
-  }
-
-  /**
-   * Perform compaction operations as specified in the compaction commit file.
-   *
-   * @param compactionInstant Compaction Instant time
-   * @param activeTimeline Active Timeline
-   * @param autoCommit Commit after compaction
-   * @return RDD of Write Status
-   */
-  private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
-      boolean autoCommit) throws IOException {
-    HoodieTableMetaClient metaClient = createMetaClient(true);
-    HoodieCompactionPlan compactionPlan =
-        CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
-    // Mark instant as compaction inflight
-    activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
     compactionTimer = metrics.getCompactionCtx();
-    // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
-    JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
-    // Force compaction action
-    statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
-    // pass extra-metada so that it gets stored in commit file automatically
-    commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit,
-        Option.ofNullable(compactionPlan.getExtraMetadata()));
-    return statuses;
-  }
-
-  /**
-   * Commit Compaction and track metrics.
-   *
-   * @param compactedStatuses Compaction Write status
-   * @param table Hoodie Table
-   * @param compactionCommitTime Compaction Commit Time
-   * @param autoCommit Auto Commit
-   * @param extraMetadata Extra Metadata to store
-   */
-  protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table,
-      String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) {
-    if (autoCommit) {
-      HoodieCommitMetadata metadata = doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
-      if (compactionTimer != null) {
-        long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-        try {
-          metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
-              durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
-        } catch (ParseException e) {
-          throw new HoodieCommitException("Commit time is not of valid format.Failed to commit compaction "
-              + config.getBasePath() + " at time " + compactionCommitTime, e);
-        }
-      }
-      LOG.info("Compacted successfully on commit " + compactionCommitTime);
-    } else {
-      LOG.info("Compaction did not run for commit " + compactionCommitTime);
+    HoodieWriteMetadata compactionMetadata = table.compact(jsc, compactionInstantTime);
+    JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
+    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
+      completeCompaction(compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
     }
-  }
-
-  /**
-   * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
-   *
-   * @param inflightInstant Inflight Compaction Instant
-   * @param table Hoodie Table
-   */
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
-    table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
-    table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
-  }
-
-  private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
-      String compactionCommitTime, Option<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = table.getMetaClient();
-    List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
-
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
-    for (HoodieWriteStat stat : updateStatusMap) {
-      metadata.addWriteStat(stat.getPartitionPath(), stat);
-    }
-
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
-
-    // Finalize write
-    finalizeWrite(table, compactionCommitTime, updateStatusMap);
-
-    // Copy extraMetadata
-    extraMetadata.ifPresent(m -> {
-      m.forEach(metadata::addMetadata);
-    });
-
-    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
-    try {
-      activeTimeline.transitionCompactionInflightToComplete(
-          new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (IOException e) {
-      throw new HoodieCompactionException(
-          "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e);
-    }
-    return metadata;
+    return statuses;
   }
 
   /**
    * Performs a compaction operation on a table, serially before or after an insert/upsert action.
    */
-  private Option<String> forceCompact(Option<Map<String, String>> extraMetadata) throws IOException {
+  private Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
     Option<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
     compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
-      try {
-        // inline compaction should auto commit as the user is never given control
-        compact(compactionInstantTime, true);
-      } catch (IOException ioe) {
-        throw new HoodieIOException(ioe.getMessage(), ioe);
-      }
+      // inline compaction should auto commit as the user is never given control
+      compact(compactionInstantTime, true);
     });
     return compactionInstantTimeOpt;
   }
-
-  /**
-   * Ensure that the current writerSchema is compatible with the latest schema of this dataset.
-   *
-   * When inserting/updating data, we read records using the last used schema and convert them to the
-   * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
-   *
-   * @param hoodieTable The Hoodie Table
-   * @param isUpsert If this is a check during upserts
-   * @throws HoodieUpsertException If schema check fails during upserts
-   * @throws HoodieInsertException If schema check fails during inserts
-   */
-  private void validateSchema(HoodieTable<T> hoodieTable, final boolean isUpsert)
-      throws HoodieUpsertException, HoodieInsertException {
-
-    if (!getConfig().getAvroSchemaValidate()) {
-      // Check not required
-      return;
-    }
-
-    boolean isValid = false;
-    String errorMsg = "WriterSchema is not compatible with the schema present in the Table";
-    Throwable internalError = null;
-    Schema tableSchema = null;
-    Schema writerSchema = null;
-    try {
-      TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTable.getMetaClient());
-      writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
-      isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema);
-    } catch (Exception e) {
-      // Two error cases are possible:
-      // 1. There was no schema as no data has been inserted yet (first time only)
-      // 2. Failure in reading the schema
-      isValid = hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0;
-      errorMsg = "Failed to read latest schema on path " + basePath;
-      internalError = e;
-    }
-
-    if (!isValid) {
-      LOG.error(errorMsg);
-      LOG.warn("WriterSchema: " + writerSchema);
-      LOG.warn("Table latest schema: " + tableSchema);
-      if (isUpsert) {
-        throw new HoodieUpsertException(errorMsg, internalError);
-      } else {
-        throw new HoodieInsertException(errorMsg, internalError);
-      }
-    }
-  }
-
 }
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 910c04c..bb087a2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.DefaultHoodieConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
-import org.apache.hudi.table.compact.strategy.LogFileSizeBasedCompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
 
 import javax.annotation.concurrent.Immutable;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 50af725..2d98edc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.MetricsReporterType;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 087a2b0..73c1b23 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ParquetReaderIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,7 @@ import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
@@ -56,6 +57,7 @@ import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
@@ -130,14 +132,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
-    throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
+  public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
+    throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
   }
 
   @Override
-  public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
-      HoodieCompactionPlan compactionPlan) {
-    throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
+  public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
+    throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
   }
 
   public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
@@ -202,6 +203,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
+  @Override
+  public HoodieSavepointMetadata savepoint(JavaSparkContext jsc, String instantToSavepoint, String user, String comment) {
+    return new SavepointActionExecutor(jsc, config, this, instantToSavepoint, user, comment).execute();
+  }
+
   public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) {
     return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute();
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 844b9ad..cacc1ef 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -21,19 +21,17 @@ package org.apache.hudi.table;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
 import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
@@ -41,17 +39,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
-import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
 
 /**
  * Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification.
@@ -119,46 +116,16 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
-    LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
-    Option<HoodieInstant> lastCompaction =
-        getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
-    String deltaCommitsSinceTs = "0";
-    if (lastCompaction.isPresent()) {
-      deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
-    }
-
-    int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
-      LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
-      return new HoodieCompactionPlan();
-    }
-
-    LOG.info("Compacting merge on read table " + config.getBasePath());
-    HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
-    try {
-      return compactor.generateCompactionPlan(jsc, this, config, instantTime,
-          ((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations()
-              .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
-              .collect(Collectors.toSet()));
-
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
-    }
+  public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
+    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
+        jsc, config, this, instantTime, extraMetadata);
+    return scheduleCompactionExecutor.execute();
   }
 
   @Override
-  public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
-      HoodieCompactionPlan compactionPlan) {
-    HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
-    try {
-      return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime);
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
-    }
+  public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
+    RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(jsc, config, this, compactionInstantTime);
+    return compactionExecutor.execute();
   }
 
   @Override
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0275b72..9904411 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,16 +18,17 @@
 
 package org.apache.hudi.table;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
@@ -38,10 +39,10 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -54,9 +55,10 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -295,24 +297,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
   }
 
-  /**
-   * Get the list of data file names savepointed.
-   */
-  public Stream<String> getSavepointedDataFiles(String savepointTime) {
-    if (!getSavepoints().contains(savepointTime)) {
-      throw new HoodieSavepointException(
-          "Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
-    }
-    HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
-    HoodieSavepointMetadata metadata;
-    try {
-      metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get());
-    } catch (IOException e) {
-      throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
-    }
-    return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
-  }
-
   public HoodieActiveTimeline getActiveTimeline() {
     return metaClient.getActiveTimeline();
   }
@@ -329,19 +313,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    * 
    * @param jsc Spark Context
    * @param instantTime Instant Time for scheduling compaction
+   * @param extraMetadata additional metadata to write into plan
    * @return
    */
-  public abstract HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime);
+  public abstract Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc,
+                                                                  String instantTime,
+                                                                  Option<Map<String, String>> extraMetadata);
 
   /**
    * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access.
    *
    * @param jsc Spark Context
    * @param compactionInstantTime Instant Time
-   * @param compactionPlan Compaction Plan
    */
-  public abstract JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
-      HoodieCompactionPlan compactionPlan);
+  public abstract HoodieWriteMetadata compact(JavaSparkContext jsc,
+                                              String compactionInstantTime);
 
   /**
    * Executes a new clean action.
@@ -366,6 +352,15 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
                                                   boolean deleteInstants);
 
   /**
+   * Create a savepoint at the specified instant, so that the table can be restored
+   * to this point-in-timeline later if needed.
+   */
+  public abstract HoodieSavepointMetadata savepoint(JavaSparkContext jsc,
+                                                    String instantToSavepoint,
+                                                    String user,
+                                                    String comment);
+
+  /**
    * Restore the table to the given instant. Note that this is a admin table recovery operation
    * that would cause any running queries that are accessing file slices written after the instant to fail.
    */
@@ -519,4 +514,52 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
   public SparkTaskContextSupplier getSparkTaskContextSupplier() {
     return sparkTaskContextSupplier;
   }
+
+  /**
+   * Ensure that the current writerSchema is compatible with the latest schema of this dataset.
+   *
+   * When inserting/updating data, we read records using the last used schema and convert them to the
+   * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
+   *
+   */
+  private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
+
+    if (!config.getAvroSchemaValidate() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+      // Check not required
+      return;
+    }
+
+    Schema tableSchema;
+    Schema writerSchema;
+    boolean isValid;
+    try {
+      TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
+      writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+      isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
+    }
+
+    if (!isValid) {
+      throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema
+          + ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath());
+    }
+  }
+
+  public void validateUpsertSchema() throws HoodieUpsertException {
+    try {
+      validateSchema();
+    } catch (HoodieException e) {
+      throw new HoodieUpsertException("Failed upsert schema compatibility check.", e);
+    }
+  }
+
+  public void validateInsertSchema() throws HoodieInsertException {
+    try {
+      validateSchema();
+    } catch (HoodieException e) {
+      throw new HoodieInsertException("Failed insert schema compability check.", e);
+    }
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
rename to hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 5bd81ee..bfe9c9d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -68,18 +68,18 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * Archiver to bound the growth of <action>.commit files.
+ * Archiver to bound the growth of files under .hoodie meta path.
  */
-public class HoodieCommitArchiveLog {
+public class HoodieTimelineArchiveLog {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class);
 
   private final Path archiveFilePath;
   private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
   private Writer writer;
 
-  public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
+  public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
     this.config = config;
     this.metaClient = metaClient;
     this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
index 64fd4df..c46102a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.table.action;
 
 import java.util.List;
 import org.apache.hudi.client.WriteStatus;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 62203a3..9750162 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.clean;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 
+import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -50,6 +52,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Cleaner is responsible for garbage collecting older files in a given partition path. Such that
@@ -82,6 +85,25 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
   }
 
   /**
+   * Get the list of data file names savepointed.
+   */
+  public Stream<String> getSavepointedDataFiles(String savepointTime) {
+    if (!hoodieTable.getSavepoints().contains(savepointTime)) {
+      throw new HoodieSavepointException(
+          "Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
+    }
+    HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+    HoodieSavepointMetadata metadata;
+    try {
+      metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+          hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
+    } catch (IOException e) {
+      throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
+    }
+    return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
+  }
+
+  /**
    * Returns list of partitions where clean operations needs to be performed.
    *
    * @param newInstantToRetain New instant to be retained after this cleanup operation
@@ -131,7 +153,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
     List<String> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
-        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
+        .flatMap(this::getSavepointedDataFiles)
+        .collect(Collectors.toList());
 
     for (HoodieFileGroup fileGroup : fileGroups) {
       int keepVersions = config.getCleanerFileVersionsRetained();
@@ -190,7 +213,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
 
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
-        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
+        .flatMap(this::getSavepointedDataFiles)
+        .collect(Collectors.toList());
 
     // determine if we have enough commits, to start cleaning.
     if (commitTimeline.countInstants() > commitsRetained) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index e72c801..93a655e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.BaseActionExecutor;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -165,8 +166,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
         (HoodieTable<T>)table);
     result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
     result.setWriteStatuses(statuses);
-
-    // Trigger the insert and collect statuses
     commitOnAutoCommit(result);
   }
 
@@ -207,7 +206,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
     try {
       activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-
       LOG.info("Committed " + instantTime);
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
index e0182da..9f5468e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
index fbc8dbb..4755664 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -30,6 +30,7 @@ import org.apache.hudi.execution.BulkInsertMapFunction;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
index 0b8e75f..3d80a07 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
index ba25a97..8fa1cb7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
index 7ee891f..8c0b75f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -58,8 +59,8 @@ public class DeleteHelper<T extends HoodieRecordPayload<T>> {
   }
 
   public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
-      JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
-      CommitActionExecutor<T> deleteExecutor) {
+                                                                               JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
+                                                                               CommitActionExecutor<T> deleteExecutor) {
     try {
       HoodieWriteMetadata result = null;
       // De-dupe/merge if needed
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
index d08dab2..d8944e3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
index acc9902..b7d64b1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
index efdcae1..0c4d08e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
index 5999104..d8470ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
index 7faee1c..92dcbb6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -35,9 +36,9 @@ import scala.Tuple2;
 public class WriteHelper<T extends HoodieRecordPayload<T>> {
 
   public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
-      JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
-      HoodieTable<T> table, boolean shouldCombine,
-      int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
+                                                                             JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
+                                                                             HoodieTable<T> table, boolean shouldCombine,
+                                                                             int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
     try {
       // De-dupe/merge if needed
       JavaRDD<HoodieRecord<T>> dedupedRecords =
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
new file mode 100644
index 0000000..97fdd0f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class CompactHelpers {
+
+  public static HoodieCommitMetadata createCompactionMetadata(HoodieTable<?> table,
+                                                              String compactionInstantTime,
+                                                              JavaRDD<WriteStatus> writeStatuses,
+                                                              String schema) throws IOException {
+    byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
+        HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
+    HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
+    List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
+    org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
+    for (HoodieWriteStat stat : updateStatusMap) {
+      metadata.addWriteStat(stat.getPartitionPath(), stat);
+    }
+    metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
+    if (compactionPlan.getExtraMetadata() != null) {
+      compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
+    }
+    return metadata;
+  }
+
+  public static void completeInflightCompaction(HoodieTable<?> table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) {
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    try {
+      activeTimeline.transitionCompactionInflightToComplete(
+          new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
+          Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (IOException e) {
+      throw new HoodieCompactionException(
+          "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index f5579dd..c81b028 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index f71ab95..2c0afa9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
@@ -42,7 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieCopyOnWriteTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +71,6 @@ import static java.util.stream.Collectors.toList;
  * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
  * a normal commit
  *
- * @see HoodieCompactor
  */
 public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
index 53bc48a..cd00cb9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.common.util.Option;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
new file mode 100644
index 0000000..2f99fa1
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RunCompactionActionExecutor extends BaseActionExecutor<HoodieWriteMetadata> {
+
+  private static final Logger LOG = LogManager.getLogger(RunCompactionActionExecutor.class);
+
+  public RunCompactionActionExecutor(JavaSparkContext jsc,
+                                     HoodieWriteConfig config,
+                                     HoodieTable<?> table,
+                                     String instantTime) {
+    super(jsc, config, table, instantTime);
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
+    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+    if (!pendingCompactionTimeline.containsInstant(instant)) {
+      throw new IllegalStateException(
+          "No Compaction request available at " + instantTime + " to run compaction");
+    }
+
+    HoodieWriteMetadata compactionMetadata = new HoodieWriteMetadata();
+    try {
+      HoodieActiveTimeline timeline = table.getActiveTimeline();
+      HoodieCompactionPlan compactionPlan =
+          CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
+      // Mark instant as compaction inflight
+      timeline.transitionCompactionRequestedToInflight(instant);
+      table.getMetaClient().reloadActiveTimeline();
+
+      HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
+      JavaRDD<WriteStatus> statuses = compactor.compact(jsc, compactionPlan, table, config, instantTime);
+
+      statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+      List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect();
+      HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
+      for (HoodieWriteStat stat : updateStatusMap) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+      metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+
+      compactionMetadata.setWriteStatuses(statuses);
+      compactionMetadata.setCommitted(false);
+      compactionMetadata.setCommitMetadata(Option.of(metadata));
+    } catch (IOException e) {
+      throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
+    }
+
+    return compactionMetadata;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
new file mode 100644
index 0000000..586b5b3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<HoodieCompactionPlan>> {
+
+  private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
+
+  private final Option<Map<String, String>> extraMetadata;
+
+  public ScheduleCompactionActionExecutor(JavaSparkContext jsc,
+                                          HoodieWriteConfig config,
+                                          HoodieTable<?> table,
+                                          String instantTime,
+                                          Option<Map<String, String>> extraMetadata) {
+    super(jsc, config, table, instantTime);
+    this.extraMetadata = extraMetadata;
+  }
+
+  private HoodieCompactionPlan scheduleCompaction() {
+    LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
+    Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+    String deltaCommitsSinceTs = "0";
+    if (lastCompaction.isPresent()) {
+      deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
+    }
+
+    int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
+        .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
+    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
+      LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+          + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+          + config.getInlineCompactDeltaCommitMax());
+      return new HoodieCompactionPlan();
+    }
+
+    LOG.info("Compacting merge on read table " + config.getBasePath());
+    HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
+    try {
+      return compactor.generateCompactionPlan(jsc, table, config, instantTime,
+          ((SyncableFileSystemView) table.getSliceView()).getPendingCompactionOperations()
+              .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
+              .collect(Collectors.toSet()));
+
+    } catch (IOException e) {
+      throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
+    }
+  }
+
+  @Override
+  public Option<HoodieCompactionPlan> execute() {
+    // if there are inflight writes, their instantTime must not be less than that of compaction instant time
+    table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
+        .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
+            HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
+            "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+                + ", Compaction scheduled at " + instantTime));
+
+    // Committed and pending compaction instants should have strictly lower timestamps
+    List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
+        .getCommitsAndCompactionTimeline().getInstants()
+        .filter(instant -> HoodieTimeline.compareTimestamps(
+            instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
+        .collect(Collectors.toList());
+    ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+        "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+            + conflictingInstants);
+
+    HoodieCompactionPlan plan = scheduleCompaction();
+    if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
+      extraMetadata.ifPresent(plan::setExtraMetadata);
+      HoodieInstant compactionInstant =
+          new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
+      try {
+        table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
+            TimelineMetadataUtils.serializeCompactionPlan(plan));
+      } catch (IOException ioe) {
+        throw new HoodieIOException("Exception scheduling compaction", ioe);
+      }
+      return Option.of(plan);
+    }
+    return Option.empty();
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
index f77a015..d93a50f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
index 24da226..597348f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
index 928ae3a..0e8e4c6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
+import org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor;
 
 import java.io.Serializable;
 import java.util.HashMap;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
index ea29fdd..4a12bb8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
index 83f1cca..c9a811a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
similarity index 96%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
index 84f4ee0..ffc437b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
index 9c00e5e..66b5612 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
index 95779a7..5e4b915 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
 import org.apache.hudi.table.action.commit.BulkInsertHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
index 7b6e146..5a3fe7a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
 import org.apache.hudi.table.action.commit.BulkInsertHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
index 1575406..53d4d84 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
@@ -25,7 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hudi.table.action.commit.DeleteHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
index f76f7fc..2124165 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.WriteHelper;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
index 55031ea..0fb787e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
index 1fdf433..1809078 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.WriteHelper;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
index 413d2e2..d1773f9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 8b95198..498d7b7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -118,7 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
         config.shouldAssumeDatePartitioning());
     int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
     return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
-      HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload();
+      HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
       List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
       switch (instantToRollback.getAction()) {
         case HoodieTimeline.COMMIT_ACTION:
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
new file mode 100644
index 0000000..d467074
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.savepoint;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointMetadata> {
+
+  private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class);
+
+  private final String user;
+  private final String comment;
+
+  public SavepointActionExecutor(JavaSparkContext jsc,
+                                 HoodieWriteConfig config,
+                                 HoodieTable<?> table,
+                                 String instantTime,
+                                 String user,
+                                 String comment) {
+    super(jsc, config, table, instantTime);
+    this.user = user;
+    this.comment = comment;
+  }
+
+  @Override
+  public HoodieSavepointMetadata execute() {
+    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
+    }
+    Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
+    HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
+    if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
+      throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
+    }
+
+    try {
+      // Check the last commit that was not cleaned and check if savepoint time is > that commit
+      String lastCommitRetained;
+      if (cleanInstant.isPresent()) {
+        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
+            .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
+        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
+      } else {
+        lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
+      }
+
+      // Cannot allow savepoint time on a commit that could have been cleaned
+      ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
+          "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
+
+      Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
+          table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
+          .mapToPair(partitionPath -> {
+            // Scan all partitions files with this commit time
+            LOG.info("Collecting latest files in partition path " + partitionPath);
+            TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+            List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+                .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+            return new Tuple2<>(partitionPath, latestFiles);
+          })
+          .collectAsMap();
+
+      HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
+      // Nothing to save in the savepoint
+      table.getActiveTimeline().createNewInstant(
+          new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
+      table.getActiveTimeline()
+          .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
+              TimelineMetadataUtils.serializeSavepointMetadata(metadata));
+      LOG.info("Savepoint " + instantTime + " created");
+      return metadata;
+    } catch (IOException e) {
+      throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
new file mode 100644
index 0000000..06acd46
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.savepoint;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class SavepointHelpers {
+
+  private static final Logger LOG = LogManager.getLogger(SavepointHelpers.class);
+
+  public static void deleteSavepoint(HoodieTable<?> table, String savepointTime) {
+    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
+    }
+    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
+    if (!isSavepointPresent) {
+      LOG.warn("No savepoint present " + savepointTime);
+      return;
+    }
+
+    table.getActiveTimeline().revertToInflight(savePoint);
+    table.getActiveTimeline().deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
+    LOG.info("Savepoint " + savepointTime + " deleted");
+  }
+
+  public static void validateSavepointRestore(HoodieTable<?> table, String savepointTime) {
+    // Make sure the restore was successful
+    table.getMetaClient().reloadActiveTimeline();
+    Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+        .getCommitsAndCompactionTimeline()
+        .filterCompletedAndCompactionInstants()
+        .lastInstant();
+    ValidationUtils.checkArgument(lastInstant.isPresent());
+    ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
+        savepointTime + " is not the last commit after restoring to savepoint, last commit was "
+            + lastInstant.get().getTimestamp());
+  }
+
+  public static void validateSavepointPresence(HoodieTable<?> table, String savepointTime) {
+    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
+    if (!isSavepointPresent) {
+      throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
+    }
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index 16ccb05..7fc0dee 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -58,7 +58,7 @@ public class TestClientRollback extends TestHoodieClientBase {
   public void testSavepointAndRollback() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
         .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg)) {
       HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
 
       /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 383fd74..9726465 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
+import org.apache.hudi.table.action.compact.OperationResult;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 7bfd768..841a09d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -79,7 +79,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table").build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
   }
@@ -157,7 +157,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     verifyInflightInstants(metaClient, 2);
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
 
     assertTrue(archiveLog.archiveIfRequired(jsc));
 
@@ -216,7 +216,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     // Requested Compaction
     HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf());
@@ -281,7 +281,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
@@ -307,7 +307,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
@@ -339,7 +339,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -386,7 +386,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
                     .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
                     .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
 
     HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
     HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
@@ -426,7 +426,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
 
     org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
     assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
similarity index 99%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index b5151b3..142e63a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
similarity index 82%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index d77d3f8..819a41c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestHarness;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
@@ -50,6 +52,7 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class TestHoodieCompactor extends HoodieClientTestHarness {
 
@@ -100,9 +103,10 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   @Test(expected = HoodieNotSupportedException.class)
   public void testCompactionOnCopyOnWriteFail() throws Exception {
     metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
-    HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
+    HoodieTable<?> table = HoodieTable.create(metaClient, getConfig(), jsc);
     String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
-    table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
+    table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+    table.compact(jsc, compactionInstantTime);
   }
 
   @Test
@@ -118,9 +122,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
-      JavaRDD<WriteStatus> result =
-          table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
-      assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
+      Option<HoodieCompactionPlan> plan = table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+      assertFalse("If there is nothing to compact, result will be empty", plan.isPresent());
     }
   }
 
@@ -128,18 +131,16 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getWriteClient(config)) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
-      List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
+      writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.create(metaClient, config, jsc);
-
+      HoodieTable table = HoodieTable.create(config, jsc);
       newCommitTime = "101";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -153,8 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
           HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
 
       // Verify that all data file has one log file
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.create(metaClient, config, jsc);
+      table = HoodieTable.create(config, jsc);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> groupedLogFiles =
             table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
@@ -162,14 +162,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
           assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
         }
       }
+      HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
 
       // Do a compaction
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      table = HoodieTable.create(metaClient, config, jsc);
-
-      String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
-      JavaRDD<WriteStatus> result =
-          table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
+      table = HoodieTable.create(config, jsc);
+      String compactionInstantTime = "102";
+      table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+      table.getMetaClient().reloadActiveTimeline();
+      JavaRDD<WriteStatus> result = table.compact(jsc, compactionInstantTime).getWriteStatuses();
 
       // Verify that all partition paths are present in the WriteStatus result
       for (String partitionPath : dataGen.getPartitionPaths()) {
@@ -184,8 +184,4 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
-
-  // TODO - after modifying HoodieReadClient to support mor tables - add more tests to make
-  // sure the data read is the updated data (compaction correctness)
-  // TODO - add more test cases for compactions after a failed commit/compaction
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
similarity index 99%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 88da442..e49aafa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.common.model.HoodieBaseFile;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
index 86b2303..060b5a3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
@@ -138,6 +138,20 @@ public class HoodieTestUtils {
     }
   }
 
+  public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException {
+    for (String instantTime : instantTimes) {
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+              + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+              + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
+      new File(
+          basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime))
+          .createNewFile();
+    }
+  }
+
   public static void createMetadataFolder(String basePath) {
     new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
   }