You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/04/13 06:11:06 UTC

[incubator-hudi] branch master updated: [HUDI-770] Organize upsert/insert API implementation under a single package (#1495)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 17bf930  [HUDI-770] Organize upsert/insert API implementation under a single package (#1495)
17bf930 is described below

commit 17bf9303423821483f17785f8536b7bfed45e42e
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun Apr 12 23:11:00 2020 -0700

    [HUDI-770] Organize upsert/insert API implementation under a single package (#1495)
---
 .../hudi/client/AbstractHoodieWriteClient.java     |  20 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  | 319 ++++---------------
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  | 349 +++------------------
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 180 +++--------
 .../java/org/apache/hudi/table/HoodieTable.java    |  92 ++++--
 .../hudi/table/action/BaseActionExecutor.java      |   5 +-
 .../action/commit/BaseCommitActionExecutor.java    | 291 +++++++++++++++++
 .../BucketInfo.java}                               |  40 ++-
 .../BucketType.java}                               |  26 +-
 .../commit/BulkInsertCommitActionExecutor.java     |  60 ++++
 .../hudi/table/action/commit/BulkInsertHelper.java |  84 +++++
 .../BulkInsertPreppedCommitActionExecutor.java     |  61 ++++
 .../table/action/commit/CommitActionExecutor.java  | 176 +++++++++++
 .../DeleteCommitActionExecutor.java}               |  32 +-
 .../hudi/table/action/commit/DeleteHelper.java     |  96 ++++++
 .../table/action/commit/HoodieWriteMetadata.java   | 104 ++++++
 .../InsertBucket.java}                             |  42 ++-
 .../InsertCommitActionExecutor.java}               |  34 +-
 .../InsertPreppedCommitActionExecutor.java}        |  32 +-
 .../SmallFile.java}                                |  41 ++-
 .../UpsertCommitActionExecutor.java}               |  34 +-
 .../table/action/commit/UpsertPartitioner.java     | 316 +++++++++++++++++++
 .../UpsertPreppedCommitActionExecutor.java}        |  32 +-
 .../hudi/table/action/commit/WriteHelper.java      | 105 +++++++
 .../BulkInsertDeltaCommitActionExecutor.java       |  62 ++++
 ...BulkInsertPreppedDeltaCommitActionExecutor.java |  63 ++++
 .../DeleteDeltaCommitActionExecutor.java}          |  34 +-
 .../deltacommit/DeltaCommitActionExecutor.java     |  94 ++++++
 .../InsertDeltaCommitActionExecutor.java           |  49 +++
 .../InsertPreppedDeltaCommitActionExecutor.java}   |  32 +-
 .../UpsertDeltaCommitActionExecutor.java           |  49 +++
 .../deltacommit/UpsertDeltaCommitPartitioner.java  | 142 +++++++++
 .../UpsertPreppedDeltaCommitActionExecutor.java}   |  32 +-
 .../apache/hudi/client/TestHoodieClientBase.java   |   3 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  48 +--
 .../apache/hudi/table/TestMergeOnReadTable.java    |   8 +-
 .../commit/TestCopyOnWriteActionExecutor.java}     | 123 ++------
 .../table/action/commit/TestUpsertPartitioner.java | 148 +++++++++
 .../hudi/common/model/WriteOperationType.java      |   4 +
 39 files changed, 2395 insertions(+), 1067 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index ea5ed9f..862a60f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -166,6 +166,18 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
       postCommit(metadata, instantTime, extraMetadata);
+      emitCommitMetrics(instantTime, metadata, actionType);
+
+      LOG.info("Committed " + instantTime);
+    } catch (IOException e) {
+      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
+          e);
+    }
+    return true;
+  }
+
+  void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
+    try {
 
       if (writeContext != null) {
         long durationInMs = metrics.getDurationInMs(writeContext.stop());
@@ -173,15 +185,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
             metadata, actionType);
         writeContext = null;
       }
-      LOG.info("Committed " + instantTime);
-    } catch (IOException e) {
-      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
-          e);
     } catch (ParseException e) {
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
           + "Instant time is not of valid format", e);
     }
-    return true;
   }
 
   /**
@@ -189,10 +196,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
    * @param metadata      Commit Metadata corresponding to committed instant
    * @param instantTime   Instant Time
    * @param extraMetadata Additional Metadata passed by user
-   * @throws IOException in case of error
    */
   protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) throws IOException;
+      Option<Map<String, String>> extraMetadata);
 
   /**
    * Finalize Write operation.
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index e69eef2..6e866f9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -26,7 +26,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.SparkConfigUtils;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
@@ -50,28 +49,22 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.BulkInsertMapFunction;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieCommitArchiveLog;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
-import org.apache.hudi.table.WorkloadProfile;
-import org.apache.hudi.table.WorkloadStat;
 
 import com.codahale.metrics.Timer;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -81,7 +74,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import scala.Tuple2;
 
@@ -176,22 +168,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
     setOperationType(WriteOperationType.UPSERT);
-    try {
-      // De-dupe/merge if needed
-      JavaRDD<HoodieRecord<T>> dedupedRecords =
-          combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
-
-      Timer.Context indexTimer = metrics.getIndexCtx();
-      // perform index loop up to get existing location of records
-      JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
-      metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
-      return upsertRecordsInternal(taggedRecords, instantTime, table, true);
-    } catch (Throwable e) {
-      if (e instanceof HoodieUpsertException) {
-        throw (HoodieUpsertException) e;
-      }
-      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
+    HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
     }
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -206,14 +187,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
     setOperationType(WriteOperationType.UPSERT_PREPPED);
-    try {
-      return upsertRecordsInternal(preppedRecords, instantTime, table, true);
-    } catch (Throwable e) {
-      if (e instanceof HoodieUpsertException) {
-        throw (HoodieUpsertException) e;
-      }
-      throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + instantTime, e);
-    }
+    HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -229,18 +204,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
     setOperationType(WriteOperationType.INSERT);
-    try {
-      // De-dupe/merge if needed
-      JavaRDD<HoodieRecord<T>> dedupedRecords =
-          combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
-
-      return upsertRecordsInternal(dedupedRecords, instantTime, table, false);
-    } catch (Throwable e) {
-      if (e instanceof HoodieInsertException) {
-        throw e;
-      }
-      throw new HoodieInsertException("Failed to insert for commit time " + instantTime, e);
-    }
+    HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -257,14 +222,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
     setOperationType(WriteOperationType.INSERT_PREPPED);
-    try {
-      return upsertRecordsInternal(preppedRecords, instantTime, table, false);
-    } catch (Throwable e) {
-      if (e instanceof HoodieInsertException) {
-        throw e;
-      }
-      throw new HoodieInsertException("Failed to insert prepared records for commit time " + instantTime, e);
-    }
+    HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -301,18 +260,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
     setOperationType(WriteOperationType.BULK_INSERT);
-    try {
-      // De-dupe/merge if needed
-      JavaRDD<HoodieRecord<T>> dedupedRecords =
-          combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
-
-      return bulkInsertInternal(dedupedRecords, instantTime, table, bulkInsertPartitioner);
-    } catch (Throwable e) {
-      if (e instanceof HoodieInsertException) {
-        throw e;
-      }
-      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
-    }
+    HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -335,14 +284,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
-    try {
-      return bulkInsertInternal(preppedRecords, instantTime, table, bulkInsertPartitioner);
-    } catch (Throwable e) {
-      if (e instanceof HoodieInsertException) {
-        throw e;
-      }
-      throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + instantTime, e);
-    }
+    HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
+    return postWrite(result, instantTime, table);
   }
 
   /**
@@ -356,170 +299,59 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.DELETE);
     setOperationType(WriteOperationType.DELETE);
-    try {
-      // De-dupe/merge if needed
-      JavaRDD<HoodieKey> dedupedKeys =
-          config.shouldCombineBeforeDelete() ? deduplicateKeys(keys) : keys;
-
-      JavaRDD<HoodieRecord<T>> dedupedRecords =
-          dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-      Timer.Context indexTimer = metrics.getIndexCtx();
-      // perform index loop up to get existing location of records
-      JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
-      // filter out non existant keys/records
-      JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
-      if (!taggedValidRecords.isEmpty()) {
-        metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
-        return upsertRecordsInternal(taggedValidRecords, instantTime, table, true);
-      } else {
-        // if entire set of keys are non existent
-        saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), table, instantTime);
-        JavaRDD<WriteStatus> writeStatusRDD = jsc.emptyRDD();
-        commitOnAutoCommit(instantTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
-        return writeStatusRDD;
-      }
-    } catch (Throwable e) {
-      if (e instanceof HoodieUpsertException) {
-        throw (HoodieUpsertException) e;
-      }
-      throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
-    }
-  }
-
-  private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String instantTime,
-      HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
-    final JavaRDD<HoodieRecord<T>> repartitionedRecords;
-    final int parallelism = config.getBulkInsertShuffleParallelism();
-    if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
-    } else {
-      // Now, sort the records and line them up nicely for loading.
-      repartitionedRecords = dedupedRecords.sortBy(record -> {
-        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-        // the records split evenly across RDD partitions, such that small partitions fit
-        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
-        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, parallelism);
-    }
-
-    // generate new file ID prefixes for each output partition
-    final List<String> fileIDPrefixes =
-        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
-
-    table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
-        table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
-
-    JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
-        .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
-        .flatMap(List::iterator);
-
-    return updateIndexAndCommitIfNeeded(writeStatusRDD, table, instantTime);
-  }
-
-  private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records,
-      int parallelism) {
-    return condition ? deduplicateRecords(records, parallelism) : records;
+    HoodieWriteMetadata result = table.delete(jsc,instantTime, keys);
+    return postWrite(result, instantTime, table);
   }
 
   /**
-   * Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
-   * rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
-   * are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
-   * Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
+   * Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
+   * @param result  Commit Action Result
+   * @param instantTime Instant Time
+   * @param hoodieTable Hoodie Table
+   * @return Write Status
    */
-  private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, HoodieTable<T> table, String instantTime)
-      throws HoodieCommitException {
-    try {
-      HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-      profile.getPartitionPaths().forEach(path -> {
-        WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
-        partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
-          HoodieWriteStat writeStat = new HoodieWriteStat();
-          writeStat.setFileId(key);
-          // TODO : Write baseCommitTime is possible here ?
-          writeStat.setPrevCommit(value.getKey());
-          writeStat.setNumUpdateWrites(value.getValue());
-          metadata.addWriteStat(path.toString(), writeStat);
-        });
-      });
-      metadata.setOperationType(getOperationType());
-
-      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-      String commitActionType = table.getMetaClient().getCommitActionType();
-      HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
-      activeTimeline.transitionRequestedToInflight(requested,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (IOException io) {
-      throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
+  private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable<T> hoodieTable) {
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
     }
-  }
-
-  private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime,
-      HoodieTable<T> hoodieTable, final boolean isUpsert) {
-
-    // Cache the tagged records, so we don't end up computing both
-    // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
-    if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
-      preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
-    } else {
-      LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel());
-    }
-
-    WorkloadProfile profile = null;
-    if (hoodieTable.isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(preppedRecords);
-      LOG.info("Workload profile :" + profile);
-      saveWorkloadProfileMetadataToInflight(profile, hoodieTable, instantTime);
-    }
-
-    // partition using the insert partitioner
-    final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile);
-    JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
-    JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
-      if (isUpsert) {
-        return hoodieTable.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
-      } else {
-        return hoodieTable.handleInsertPartition(instantTime, partition, recordItr, partitioner);
+    if (result.isCommitted()) {
+      // Perform post commit operations.
+      if (result.getFinalizeDuration().isPresent()) {
+        metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+            result.getWriteStats().get().size());
       }
-    }, true).flatMap(List::iterator);
 
-    return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, instantTime);
-  }
+      postCommit(result.getCommitMetadata().get(), instantTime, Option.empty());
 
-  private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
-    if (isUpsert) {
-      return table.getUpsertPartitioner(profile, jsc);
-    } else {
-      return table.getInsertPartitioner(profile, jsc);
+      emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
+          hoodieTable.getMetaClient().getCommitActionType());
     }
-  }
-
-  private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
-    return dedupedRecords.mapToPair(
-        record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
-        .partitionBy(partitioner).map(Tuple2::_2);
+    return result.getWriteStatuses();
   }
 
   @Override
   protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) throws IOException {
-
-    // Do an inline compaction if enabled
-    if (config.isInlineCompaction()) {
-      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
-      forceCompact(extraMetadata);
-    } else {
-      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
-    }
-    // We cannot have unbounded commit files. Archive commits if we have to archive
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
-    archiveLog.archiveIfRequired(jsc);
-    if (config.isAutoClean()) {
-      // Call clean to cleanup if there is anything to cleanup after the commit,
-      LOG.info("Auto cleaning is enabled. Running cleaner now");
-      clean(instantTime);
-    } else {
-      LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+      Option<Map<String, String>> extraMetadata) {
+    try {
+      // Do an inline compaction if enabled
+      if (config.isInlineCompaction()) {
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
+        forceCompact(extraMetadata);
+      } else {
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
+      }
+      // We cannot have unbounded commit files. Archive commits if we have to archive
+      HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
+      archiveLog.archiveIfRequired(jsc);
+      if (config.isAutoClean()) {
+        // Call clean to cleanup if there is anything to cleanup after the commit,
+        LOG.info("Auto cleaning is enabled. Running cleaner now");
+        clean(instantTime);
+      } else {
+        LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
     }
   }
 
@@ -978,47 +810,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   /**
-   * Deduplicate Hoodie records, using the given deduplication function.
-   *
-   * @param records hoodieRecords to deduplicate
-   * @param parallelism parallelism or partitions to be used while reducing/deduplicating
-   * @return RDD of HoodieRecord already be deduplicated
-   */
-  JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
-    boolean isIndexingGlobal = getIndex().isGlobal();
-    return records.mapToPair(record -> {
-      HoodieKey hoodieKey = record.getKey();
-      // If index used is global, then records are expected to differ in their partitionPath
-      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
-      return new Tuple2<>(key, record);
-    }).reduceByKey((rec1, rec2) -> {
-      @SuppressWarnings("unchecked")
-      T reducedData = (T) rec1.getData().preCombine(rec2.getData());
-      // we cannot allow the user to change the key or partitionPath, since that will affect
-      // everything
-      // so pick it from one of the records.
-      return new HoodieRecord<T>(rec1.getKey(), reducedData);
-    }, parallelism).map(Tuple2::_2);
-  }
-
-  /**
-   * Deduplicate Hoodie records, using the given deduplication function.
-   *
-   * @param keys RDD of HoodieKey to deduplicate
-   * @return RDD of HoodieKey already be deduplicated
-   */
-  JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys) {
-    boolean isIndexingGlobal = getIndex().isGlobal();
-    if (isIndexingGlobal) {
-      return keys.keyBy(HoodieKey::getRecordKey)
-          .reduceByKey((key1, key2) -> key1)
-          .values();
-    } else {
-      return keys.distinct();
-    }
-  }
-
-  /**
    * Cleanup all pending commits.
    */
   private void rollbackPendingCommits() {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index f01cdaa..03c014f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -32,26 +32,30 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
 import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
+import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
+import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.rollback.RollbackHelper;
 import org.apache.hudi.table.rollback.RollbackRequest;
 import org.apache.log4j.LogManager;
@@ -59,21 +63,16 @@ import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -94,21 +93,44 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
-    if (profile == null) {
-      throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
-    }
-    return new UpsertPartitioner(profile, jsc);
+  public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+    return new UpsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+    return new InsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    return new BulkInsertCommitActionExecutor<>(jsc, config,
+        this, instantTime, records, bulkInsertPartitioner).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
+    return new DeleteCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords) {
+    return new UpsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
-    return getUpsertPartitioner(profile, jsc);
+  public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords) {
+    return new InsertPreppedCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public boolean isWorkloadProfileNeeded() {
-    return true;
+  public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords,  Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    return new BulkInsertPreppedCommitActionExecutor<>(jsc, config,
+        this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
 
   @Override
@@ -123,19 +145,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
   }
 
   public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
-                                                  Iterator<HoodieRecord<T>> recordItr)
-      throws IOException {
-    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
-    if (!recordItr.hasNext()) {
-      LOG.info("Empty partition with fileId => " + fileId);
-      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
-    }
-    // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, recordItr);
-    return handleUpdateInternal(upsertHandle, instantTime, fileId);
-  }
-
-  public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
     // these are updates
     HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
@@ -173,26 +182,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
   }
 
-  protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
-  }
-
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
     return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
             partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
   }
 
-  public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
-      throws Exception {
-    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
-    if (!recordItr.hasNext()) {
-      LOG.info("Empty partition");
-      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
-    }
-    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
-  }
-
   public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
       Iterator<HoodieRecord<T>> recordItr) {
     HoodieCreateHandle createHandle =
@@ -201,34 +196,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                           Partitioner partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
-    try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(instantTime, binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(instantTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
-      }
-    } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
-      LOG.error(msg, t);
-      throw new HoodieUpsertException(msg, t);
-    }
-  }
-
-  @Override
-  public Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                           Partitioner partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
-  }
-
   @Override
   public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) {
     return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute();
@@ -390,242 +357,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
   }
 
   /**
-   * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
-   */
-  class UpsertPartitioner extends Partitioner {
-
-    /**
-     * List of all small files to be corrected.
-     */
-    List<SmallFile> smallFiles = new ArrayList<>();
-    /**
-     * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
-     */
-    private int totalBuckets = 0;
-    /**
-     * Stat for the current workload. Helps in determining total inserts, upserts etc.
-     */
-    private WorkloadStat globalStat;
-    /**
-     * Helps decide which bucket an incoming update should go to.
-     */
-    private HashMap<String, Integer> updateLocationToBucket;
-    /**
-     * Helps us pack inserts into 1 or more buckets depending on number of incoming records.
-     */
-    private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
-    /**
-     * Remembers what type each bucket is for later.
-     */
-    private HashMap<Integer, BucketInfo> bucketInfoMap;
-
-    /**
-     * Rolling stats for files.
-     */
-    protected HoodieRollingStatMetadata rollingStatMetadata;
-
-    UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
-      updateLocationToBucket = new HashMap<>();
-      partitionPathToInsertBuckets = new HashMap<>();
-      bucketInfoMap = new HashMap<>();
-      globalStat = profile.getGlobalStat();
-      rollingStatMetadata = getRollingStats();
-      assignUpdates(profile);
-      assignInserts(profile, jsc);
-
-      LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
-          + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
-          + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
-    }
-
-    private void assignUpdates(WorkloadProfile profile) {
-      // each update location gets a partition
-      Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
-      for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
-        for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
-                partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
-          addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
-        }
-      }
-    }
-
-    private int addUpdateBucket(String partitionPath, String fileIdHint) {
-      int bucket = totalBuckets;
-      updateLocationToBucket.put(fileIdHint, bucket);
-      BucketInfo bucketInfo = new BucketInfo();
-      bucketInfo.bucketType = BucketType.UPDATE;
-      bucketInfo.fileIdPrefix = fileIdHint;
-      bucketInfo.partitionPath = partitionPath;
-      bucketInfoMap.put(totalBuckets, bucketInfo);
-      totalBuckets++;
-      return bucket;
-    }
-
-    private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
-      // for new inserts, compute buckets depending on how many records we have for each partition
-      Set<String> partitionPaths = profile.getPartitionPaths();
-      long averageRecordSize =
-          averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
-              config.getCopyOnWriteRecordSizeEstimate());
-      LOG.info("AvgRecordSize => " + averageRecordSize);
-
-      Map<String, List<SmallFile>> partitionSmallFilesMap =
-              getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
-
-      for (String partitionPath : partitionPaths) {
-        WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
-        if (pStat.getNumInserts() > 0) {
-
-          List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
-          this.smallFiles.addAll(smallFiles);
-
-          LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
-
-          long totalUnassignedInserts = pStat.getNumInserts();
-          List<Integer> bucketNumbers = new ArrayList<>();
-          List<Long> recordsPerBucket = new ArrayList<>();
-
-          // first try packing this into one of the smallFiles
-          for (SmallFile smallFile : smallFiles) {
-            long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
-                totalUnassignedInserts);
-            if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
-              // create a new bucket or re-use an existing bucket
-              int bucket;
-              if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
-                bucket = updateLocationToBucket.get(smallFile.location.getFileId());
-                LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
-              } else {
-                bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
-                LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
-              }
-              bucketNumbers.add(bucket);
-              recordsPerBucket.add(recordsToAppend);
-              totalUnassignedInserts -= recordsToAppend;
-            }
-          }
-
-          // if we have anything more, create new insert buckets, like normal
-          if (totalUnassignedInserts > 0) {
-            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
-            if (config.shouldAutoTuneInsertSplits()) {
-              insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
-            }
-
-            int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
-            LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
-                + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
-            for (int b = 0; b < insertBuckets; b++) {
-              bucketNumbers.add(totalBuckets);
-              recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
-              BucketInfo bucketInfo = new BucketInfo();
-              bucketInfo.bucketType = BucketType.INSERT;
-              bucketInfo.partitionPath = partitionPath;
-              bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
-              bucketInfoMap.put(totalBuckets, bucketInfo);
-              totalBuckets++;
-            }
-          }
-
-          // Go over all such buckets, and assign weights as per amount of incoming inserts.
-          List<InsertBucket> insertBuckets = new ArrayList<>();
-          for (int i = 0; i < bucketNumbers.size(); i++) {
-            InsertBucket bkt = new InsertBucket();
-            bkt.bucketNumber = bucketNumbers.get(i);
-            bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
-            insertBuckets.add(bkt);
-          }
-          LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
-          partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
-        }
-      }
-    }
-
-    private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
-
-      Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
-      if (partitionPaths != null && partitionPaths.size() > 0) {
-        JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
-        partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
-            partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
-      }
-
-      return partitionSmallFilesMap;
-    }
-
-    /**
-     * Returns a list of small files in the given partition path.
-     */
-    protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-      // smallFiles only for partitionPath
-      List<SmallFile> smallFileLocations = new ArrayList<>();
-
-      HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
-
-      if (!commitTimeline.empty()) { // if we have some commits
-        HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-        List<HoodieBaseFile> allFiles = getBaseFileOnlyView()
-            .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
-
-        for (HoodieBaseFile file : allFiles) {
-          if (file.getFileSize() < config.getParquetSmallFileLimit()) {
-            String filename = file.getFileName();
-            SmallFile sf = new SmallFile();
-            sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
-            sf.sizeBytes = file.getFileSize();
-            smallFileLocations.add(sf);
-          }
-        }
-      }
-
-      return smallFileLocations;
-    }
-
-    public BucketInfo getBucketInfo(int bucketNumber) {
-      return bucketInfoMap.get(bucketNumber);
-    }
-
-    public List<InsertBucket> getInsertBuckets(String partitionPath) {
-      return partitionPathToInsertBuckets.get(partitionPath);
-    }
-
-    @Override
-    public int numPartitions() {
-      return totalBuckets;
-    }
-
-    @Override
-    public int getPartition(Object key) {
-      Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
-          (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
-      if (keyLocation._2().isPresent()) {
-        HoodieRecordLocation location = keyLocation._2().get();
-        return updateLocationToBucket.get(location.getFileId());
-      } else {
-        List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
-        // pick the target bucket to use based on the weights.
-        double totalWeight = 0.0;
-        final long totalInserts = Math.max(1, globalStat.getNumInserts());
-        final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
-        final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
-        for (InsertBucket insertBucket : targetBuckets) {
-          totalWeight += insertBucket.weight;
-          if (r <= totalWeight) {
-            return insertBucket.bucketNumber;
-          }
-        }
-        // return first one, by default
-        return targetBuckets.get(0).bucketNumber;
-      }
-    }
-  }
-
-  protected HoodieRollingStatMetadata getRollingStats() {
-    return null;
-  }
-
-  /**
    * Obtains the average record size based on records written during previous commits. Used for estimating how many
    * records pack into one file.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index d7783cd..c496df2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -24,9 +24,8 @@ import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -39,24 +38,26 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
-import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
 import org.apache.hudi.table.rollback.RollbackHelper;
 import org.apache.hudi.table.rollback.RollbackRequest;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -82,49 +83,49 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
 
   private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
 
-  // UpsertPartitioner for MergeOnRead table type
-  private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
-
   HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
     super(config, jsc, metaClient);
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
-    if (profile == null) {
-      throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
-    }
-    mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc);
-    return mergeOnReadUpsertPartitioner;
+  public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+    return new UpsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
   }
 
   @Override
-  public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath,
-                                                  String fileId, Iterator<HoodieRecord<T>> recordItr)
-      throws IOException {
-    LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
-
-    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
-      LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
-      return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
-    } else {
-      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, this,
-              partitionPath, fileId, recordItr, sparkTaskContextSupplier);
-      appendHandle.doAppend();
-      appendHandle.close();
-      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
-    }
+  public HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+    return new InsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
   }
 
   @Override
-  public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
-      throws Exception {
-    // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
-    if (index.canIndexLogFiles()) {
-      return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
-    } else {
-      return super.handleInsert(instantTime, idPfx, recordItr);
-    }
+  public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    return new BulkInsertDeltaCommitActionExecutor<>(jsc, config,
+        this, instantTime, records, bulkInsertPartitioner).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys) {
+    return new DeleteDeltaCommitActionExecutor<>(jsc, config, this, instantTime, keys).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords) {
+    return new UpsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords) {
+    return new InsertPreppedDeltaCommitActionExecutor<>(jsc, config, this, instantTime, preppedRecords).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords,  Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config,
+        this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
 
   @Override
@@ -320,105 +321,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     super.finalizeWrite(jsc, instantTs, stats);
   }
 
-  /**
-   * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
-   * without the need for an index in the logFile.
-   */
-  class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner {
-
-    MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
-      super(profile, jsc);
-    }
-
-    @Override
-    protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-      // smallFiles only for partitionPath
-      List<SmallFile> smallFileLocations = new ArrayList<>();
-
-      // Init here since this class (and member variables) might not have been initialized
-      HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
-
-      // Find out all eligible small file slices
-      if (!commitTimeline.empty()) {
-        HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-        // find smallest file in partition and append to it
-        List<FileSlice> allSmallFileSlices = new ArrayList<>();
-        // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
-        // it. Doing this overtime for a partition, we ensure that we handle small file issues
-        if (!index.canIndexLogFiles()) {
-          // TODO : choose last N small files since there can be multiple small files written to a single partition
-          // by different spark partitions in a single batch
-          Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getSliceView()
-                  .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-                  .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
-                  .min((FileSlice left, FileSlice right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
-          if (smallFileSlice.isPresent()) {
-            allSmallFileSlices.add(smallFileSlice.get());
-          }
-        } else {
-          // If we can index log files, we can add more inserts to log files for fileIds including those under
-          // pending compaction.
-          List<FileSlice> allFileSlices =
-              getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
-                  .collect(Collectors.toList());
-          for (FileSlice fileSlice : allFileSlices) {
-            if (isSmallFile(fileSlice)) {
-              allSmallFileSlices.add(fileSlice);
-            }
-          }
-        }
-        // Create SmallFiles from the eligible file slices
-        for (FileSlice smallFileSlice : allSmallFileSlices) {
-          SmallFile sf = new SmallFile();
-          if (smallFileSlice.getBaseFile().isPresent()) {
-            // TODO : Move logic of file name, file id, base commit time handling inside file slice
-            String filename = smallFileSlice.getBaseFile().get().getFileName();
-            sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
-            sf.sizeBytes = getTotalFileSize(smallFileSlice);
-            smallFileLocations.add(sf);
-          } else {
-            HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
-            sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-                FSUtils.getFileIdFromLogPath(logFile.getPath()));
-            sf.sizeBytes = getTotalFileSize(smallFileSlice);
-            smallFileLocations.add(sf);
-          }
-        }
-      }
-      return smallFileLocations;
-    }
-
-    public List<String> getSmallFileIds() {
-      return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
-          .collect(Collectors.toList());
-    }
-
-    private long getTotalFileSize(FileSlice fileSlice) {
-      if (!fileSlice.getBaseFile().isPresent()) {
-        return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
-      } else {
-        return fileSlice.getBaseFile().get().getFileSize()
-            + convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
-      }
-    }
-
-    private boolean isSmallFile(FileSlice fileSlice) {
-      long totalSize = getTotalFileSize(fileSlice);
-      return totalSize < config.getParquetMaxFileSize();
-    }
-
-    // TODO (NA) : Make this static part of utility
-    public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
-      long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
-          .filter(size -> size > 0).reduce(Long::sum).orElse(0L);
-      // Here we assume that if there is no base parquet file, all log files contain only inserts.
-      // We can then just get the parquet equivalent size of these log files, compare that with
-      // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
-      return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
-    }
-  }
-
   private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
       HoodieCommitMetadata commitMetadata) {
     ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 50ec45f..2bf66d3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -54,16 +55,15 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
@@ -127,19 +127,83 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
   }
 
   /**
-   * Provides a partitioner to perform the upsert operation, based on the workload profile.
+   * Upsert a batch of new records into Hoodie table at the supplied instantTime.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param records  JavaRDD of hoodieRecords to upsert
+   * @return HoodieWriteMetadata
    */
-  public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
+  public abstract HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> records);
 
   /**
-   * Provides a partitioner to perform the insert operation, based on the workload profile.
+   * Insert a batch of new records into Hoodie table at the supplied instantTime.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param records  JavaRDD of hoodieRecords to upsert
+   * @return HoodieWriteMetadata
    */
-  public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc);
+  public abstract HoodieWriteMetadata insert(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> records);
 
   /**
-   * Return whether this HoodieTable implementation can benefit from workload profiling.
+   * Bulk Insert a batch of new records into Hoodie table at the supplied instantTime.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param records  JavaRDD of hoodieRecords to upsert
+   * @param bulkInsertPartitioner User Defined Partitioner
+   * @return HoodieWriteMetadata
    */
-  public abstract boolean isWorkloadProfileNeeded();
+  public abstract HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> records, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
+
+  /**
+   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
+   * de-duped and non existent keys will be removed before deleting.
+   *
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param keys   {@link List} of {@link HoodieKey}s to be deleted
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata delete(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieKey> keys);
+
+  /**
+   * Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   * <p>
+   * This implementation requires that the input records are already tagged, and de-duped if needed.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  JavaRDD of hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata upsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords);
+
+  /**
+   * Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   * <p>
+   * This implementation requires that the input records are already tagged, and de-duped if needed.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  JavaRDD of hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords);
+
+  /**
+   * Bulk Insert the given prepared records into the Hoodie table, at the supplied instantTime.
+   * <p>
+   * This implementation requires that the input records are already tagged, and de-duped if needed.
+   * @param jsc    Java Spark Context jsc
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  JavaRDD of hoodieRecords to upsert
+   * @param bulkInsertPartitioner User Defined Partitioner
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
+      JavaRDD<HoodieRecord<T>> preppedRecords,  Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner);
 
   public HoodieWriteConfig getConfig() {
     return config;
@@ -260,18 +324,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
   }
 
   /**
-   * Perform the ultimate IO for a given upserted (RDD) partition.
-   */
-  public abstract Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition,
-      Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
-
-  /**
-   * Perform the ultimate IO for a given inserted (RDD) partition.
-   */
-  public abstract Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition,
-      Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
-
-  /**
    * Schedule compaction for the instant time.
    * 
    * @param jsc Spark Context
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index e6ac2e9..5f81db7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.table.action;
 
+import java.io.Serializable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public abstract class BaseActionExecutor<R> {
+public abstract class BaseActionExecutor<R> implements Serializable {
 
-  protected final JavaSparkContext jsc;
+  protected final transient JavaSparkContext jsc;
 
   protected final HoodieWriteConfig config;
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
new file mode 100644
index 0000000..e72c801
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends BaseActionExecutor<HoodieWriteMetadata> {
+
+  private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
+
+  private final WriteOperationType operationType;
+  protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
+
+  public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
+      HoodieTable table, String instantTime, WriteOperationType operationType) {
+    this(jsc, config, table, instantTime, operationType, null);
+  }
+
+  public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
+      HoodieTable table, String instantTime, WriteOperationType operationType,
+      JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime);
+    this.operationType = operationType;
+  }
+
+  public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    // Cache the tagged records, so we don't end up computing both
+    // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
+    if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
+      inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+    } else {
+      LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
+    }
+
+    WorkloadProfile profile = null;
+    if (isWorkloadProfileNeeded()) {
+      profile = new WorkloadProfile(inputRecordsRDD);
+      LOG.info("Workload profile :" + profile);
+      saveWorkloadProfileMetadataToInflight(profile, instantTime);
+    }
+
+    // partition using the insert partitioner
+    final Partitioner partitioner = getPartitioner(profile);
+    JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
+    JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, partitioner);
+      }
+    }, true).flatMap(List::iterator);
+
+    updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+    return result;
+  }
+
+  /**
+   * Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
+   * rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
+   * are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
+   * Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
+   */
+  void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
+      throws HoodieCommitException {
+    try {
+      HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+      profile.getPartitionPaths().forEach(path -> {
+        WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
+        partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
+          HoodieWriteStat writeStat = new HoodieWriteStat();
+          writeStat.setFileId(key);
+          // TODO : Write baseCommitTime is possible here ?
+          writeStat.setPrevCommit(value.getKey());
+          writeStat.setNumUpdateWrites(value.getValue());
+          metadata.addWriteStat(path.toString(), writeStat);
+        });
+      });
+      metadata.setOperationType(operationType);
+
+      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+      String commitActionType = table.getMetaClient().getCommitActionType();
+      HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
+      activeTimeline.transitionRequestedToInflight(requested,
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (IOException io) {
+      throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
+    }
+  }
+
+  private Partitioner getPartitioner(WorkloadProfile profile) {
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      return getUpsertPartitioner(profile);
+    } else {
+      return getInsertPartitioner(profile);
+    }
+  }
+
+  private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
+    return dedupedRecords.mapToPair(
+        record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
+        .partitionBy(partitioner).map(Tuple2::_2);
+  }
+
+  protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+    // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
+    // RDD actions that are performed after updating the index.
+    writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    Instant indexStartTime = Instant.now();
+    // Update the index back
+    JavaRDD<WriteStatus> statuses = ((HoodieTable<T>)table).getIndex().updateLocation(writeStatusRDD, jsc,
+        (HoodieTable<T>)table);
+    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+    result.setWriteStatuses(statuses);
+
+    // Trigger the insert and collect statuses
+    commitOnAutoCommit(result);
+  }
+
+  protected void commitOnAutoCommit(HoodieWriteMetadata result) {
+    if (config.shouldAutoCommit()) {
+      LOG.info("Auto commit enabled: Committing " + instantTime);
+      commit(Option.empty(), result);
+    } else {
+      LOG.info("Auto commit disabled for " + instantTime);
+    }
+  }
+
+  private void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
+    String actionType = table.getMetaClient().getCommitActionType();
+    LOG.info("Committing " + instantTime + ", action Type " + actionType);
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable<T> table = HoodieTable.create(config, jsc);
+
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    result.setCommitted(true);
+    List<HoodieWriteStat> stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
+    result.setWriteStats(stats);
+
+    updateMetadataAndRollingStats(metadata, stats);
+
+    // Finalize write
+    finalizeWrite(instantTime, stats, result);
+
+    // add in extra metadata
+    if (extraMetadata.isPresent()) {
+      extraMetadata.get().forEach(metadata::addMetadata);
+    }
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+    metadata.setOperationType(operationType);
+
+    try {
+      activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+      LOG.info("Committed " + instantTime);
+    } catch (IOException e) {
+      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
+          e);
+    }
+    result.setCommitMetadata(Option.of(metadata));
+  }
+
+  /**
+   * Finalize Write operation.
+   * @param instantTime Instant Time
+   * @param stats Hoodie Write Stat
+   */
+  protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, HoodieWriteMetadata result) {
+    try {
+      Instant start = Instant.now();
+      table.finalizeWrite(jsc, instantTime, stats);
+      result.setFinalizeDuration(Duration.between(start, Instant.now()));
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
+  private void updateMetadataAndRollingStats(HoodieCommitMetadata metadata, List<HoodieWriteStat> writeStats) {
+    // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
+    // 2. Now, first read the existing rolling stats and merge with the result of current metadata.
+
+    // Need to do this on every commit (delta or commit) to support COW and MOR.
+    for (HoodieWriteStat stat : writeStats) {
+      String partitionPath = stat.getPartitionPath();
+      // TODO: why is stat.getPartitionPath() null at times here.
+      metadata.addWriteStat(partitionPath, stat);
+    }
+  }
+
+  protected boolean isWorkloadProfileNeeded() {
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
+      Partitioner partitioner) {
+    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
+    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
+    BucketType btype = binfo.bucketType;
+    try {
+      if (btype.equals(BucketType.INSERT)) {
+        return handleInsert(binfo.fileIdPrefix, recordItr);
+      } else if (btype.equals(BucketType.UPDATE)) {
+        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
+      } else {
+        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+      }
+    } catch (Throwable t) {
+      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+      LOG.error(msg, t);
+      throw new HoodieUpsertException(msg, t);
+    }
+  }
+
+  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
+      Partitioner partitioner) {
+    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+  }
+
+  /**
+   * Provides a partitioner to perform the upsert operation, based on the workload profile.
+   */
+  protected abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
+
+  /**
+   * Provides a partitioner to perform the insert operation, based on the workload profile.
+   */
+  protected abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
+
+  protected abstract Iterator<List<WriteStatus>> handleInsert(String idPfx,
+      Iterator<HoodieRecord<T>> recordItr) throws Exception;
+
+  protected abstract Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+      Iterator<HoodieRecord<T>> recordItr) throws IOException;
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
index e6ac2e9..1d98ad4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
@@ -16,28 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
 
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
-
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+/**
+ * Helper class for a bucket's type (INSERT and UPDATE) and its file location.
+ */
+public class BucketInfo implements Serializable {
+
+  BucketType bucketType;
+  String fileIdPrefix;
+  String partitionPath;
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("BucketInfo {");
+    sb.append("bucketType=").append(bucketType).append(", ");
+    sb.append("fileIdPrefix=").append(fileIdPrefix).append(", ");
+    sb.append("partitionPath=").append(partitionPath);
+    sb.append('}');
+    return sb.toString();
   }
-
-  public abstract R execute();
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
index e6ac2e9..70ee473 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
@@ -16,28 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
-
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
-  }
-
-  public abstract R execute();
+public enum BucketType {
+  UPDATE, INSERT
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
new file mode 100644
index 0000000..e0182da
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+  public BulkInsertCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
+    this.bulkInsertPartitioner = bulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    try {
+      return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
+          this, true, bulkInsertPartitioner);
+    } catch (Throwable e) {
+      if (e instanceof HoodieInsertException) {
+        throw e;
+      }
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
new file mode 100644
index 0000000..fbc8dbb
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.BulkInsertMapFunction;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
+
+  public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(
+      JavaRDD<HoodieRecord<T>> inputRecords, String instantTime,
+      HoodieTable<T> table, HoodieWriteConfig config,
+      CommitActionExecutor<T> executor, boolean performDedupe,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+
+    // De-dupe/merge if needed
+    JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
+
+    if (performDedupe) {
+      dedupedRecords = WriteHelper.combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
+          config.getInsertShuffleParallelism(), ((HoodieTable<T>)table));
+    }
+
+    final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+    final int parallelism = config.getBulkInsertShuffleParallelism();
+    if (bulkInsertPartitioner.isPresent()) {
+      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+    } else {
+      // Now, sort the records and line them up nicely for loading.
+      repartitionedRecords = dedupedRecords.sortBy(record -> {
+        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
+        // the records split evenly across RDD partitions, such that small partitions fit
+        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
+        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
+      }, true, parallelism);
+    }
+
+    // generate new file ID prefixes for each output partition
+    final List<String> fileIDPrefixes =
+        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+
+    table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
+        table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
+
+    JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
+        .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
+        .flatMap(List::iterator);
+
+    executor.updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+    return result;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..0b8e75f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
+  private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+  public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+    this.preppedInputRecordRdd = preppedInputRecordRdd;
+    this.bulkInsertPartitioner = bulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    try {
+      return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
+          this, false, bulkInsertPartitioner);
+    } catch (Throwable e) {
+      if (e instanceof HoodieInsertException) {
+        throw e;
+      }
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
new file mode 100644
index 0000000..b958837
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ParquetReaderIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends BaseCommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(CommitActionExecutor.class);
+
+  public CommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, WriteOperationType operationType) {
+    super(jsc, config, table, instantTime, operationType);
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+      Iterator<HoodieRecord<T>> recordItr)
+      throws IOException {
+    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
+    if (!recordItr.hasNext()) {
+      LOG.info("Empty partition with fileId => " + fileId);
+      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+    }
+    // these are updates
+    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
+    return handleUpdateInternal(upsertHandle, fileId);
+  }
+
+  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
+    // these are updates
+    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, keyToNewRecords, oldDataFile);
+    return handleUpdateInternal(upsertHandle, fileId);
+  }
+
+  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
+      throws IOException {
+    if (upsertHandle.getOldFilePath() == null) {
+      throw new HoodieUpsertException(
+          "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
+    } else {
+      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), upsertHandle.getWriterSchema());
+      BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+      try (ParquetReader<IndexedRecord> reader =
+          AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build()) {
+        wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
+            new UpdateHandler(upsertHandle), x -> x);
+        wrapper.execute();
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      } finally {
+        upsertHandle.close();
+        if (null != wrapper) {
+          wrapper.shutdownNow();
+        }
+      }
+    }
+
+    // TODO(vc): This needs to be revisited
+    if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
+      LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+          + upsertHandle.getWriteStatus());
+    }
+    return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
+  }
+
+  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
+    return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+  }
+
+  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
+      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+    return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, keyToNewRecords,
+        partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier);
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
+      throws Exception {
+    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
+    if (!recordItr.hasNext()) {
+      LOG.info("Empty partition");
+      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+    }
+    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+        sparkTaskContextSupplier);
+  }
+
+  @Override
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+    if (profile == null) {
+      throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
+    }
+    return new UpsertPartitioner(profile, jsc, table, config);
+  }
+
+  @Override
+  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
+    return getUpsertPartitioner(profile);
+  }
+
+  /**
+   * Consumer that dequeues records from queue and sends to Merge Handle.
+   */
+  private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
+
+    private final HoodieMergeHandle upsertHandle;
+
+    private UpdateHandler(HoodieMergeHandle upsertHandle) {
+      this.upsertHandle = upsertHandle;
+    }
+
+    @Override
+    protected void consumeOneRecord(GenericRecord record) {
+      upsertHandle.write(record);
+    }
+
+    @Override
+    protected void finish() {}
+
+    @Override
+    protected Void getResult() {
+      return null;
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
index e6ac2e9..ba25a97 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class DeleteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private final JavaRDD<HoodieKey> keys;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public DeleteCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieKey> keys) {
+    super(jsc, config, table, instantTime, WriteOperationType.DELETE);
+    this.keys = keys;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
new file mode 100644
index 0000000..7ee891f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Helper class to perform delete keys on hoodie table.
+ * @param <T>
+ */
+public class DeleteHelper<T extends HoodieRecordPayload<T>> {
+
+  /**
+   * Deduplicate Hoodie records, using the given deduplication function.
+   *
+   * @param keys RDD of HoodieKey to deduplicate
+   * @return RDD of HoodieKey already be deduplicated
+   */
+  private static  <T extends HoodieRecordPayload<T>> JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys,
+      HoodieTable<T> table) {
+    boolean isIndexingGlobal = table.getIndex().isGlobal();
+    if (isIndexingGlobal) {
+      return keys.keyBy(HoodieKey::getRecordKey)
+          .reduceByKey((key1, key2) -> key1)
+          .values();
+    } else {
+      return keys.distinct();
+    }
+  }
+
+  public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
+      JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
+      CommitActionExecutor<T> deleteExecutor) {
+    try {
+      HoodieWriteMetadata result = null;
+      // De-dupe/merge if needed
+      JavaRDD<HoodieKey> dedupedKeys = config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, table) : keys;
+
+      JavaRDD<HoodieRecord<T>> dedupedRecords =
+          dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      Instant beginTag = Instant.now();
+      // perform index loop up to get existing location of records
+      JavaRDD<HoodieRecord<T>> taggedRecords =
+          ((HoodieTable<T>)table).getIndex().tagLocation(dedupedRecords, jsc, (HoodieTable<T>)table);
+      Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
+
+      // filter out non existant keys/records
+      JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+      if (!taggedValidRecords.isEmpty()) {
+        result = deleteExecutor.execute(taggedValidRecords);
+        result.setIndexLookupDuration(tagLocationDuration);
+      } else {
+        // if entire set of keys are non existent
+        deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), instantTime);
+        result = new HoodieWriteMetadata();
+        result.setWriteStatuses(jsc.emptyRDD());
+        deleteExecutor.commitOnAutoCommit(result);
+      }
+      return result;
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
new file mode 100644
index 0000000..64fd4df
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import java.util.List;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.time.Duration;
+
+/**
+ * Contains metadata, write-statuses and latency times corresponding to a commit/delta-commit action.
+ */
+public class HoodieWriteMetadata {
+
+  private JavaRDD<WriteStatus> writeStatuses;
+  private Option<Duration> indexLookupDuration = Option.empty();
+
+  // Will be set when auto-commit happens
+  private boolean isCommitted;
+  private Option<HoodieCommitMetadata> commitMetadata = Option.empty();
+  private Option<List<HoodieWriteStat>> writeStats = Option.empty();
+  private Option<Duration> indexUpdateDuration = Option.empty();
+  private Option<Duration> finalizeDuration = Option.empty();
+
+  public HoodieWriteMetadata() {
+  }
+
+  public JavaRDD<WriteStatus> getWriteStatuses() {
+    return writeStatuses;
+  }
+
+  public Option<HoodieCommitMetadata> getCommitMetadata() {
+    return commitMetadata;
+  }
+
+  public void setWriteStatuses(JavaRDD<WriteStatus> writeStatuses) {
+    this.writeStatuses = writeStatuses;
+  }
+
+  public void setCommitMetadata(Option<HoodieCommitMetadata> commitMetadata) {
+    this.commitMetadata = commitMetadata;
+  }
+
+  public Option<Duration> getFinalizeDuration() {
+    return finalizeDuration;
+  }
+
+  public void setFinalizeDuration(Duration finalizeDuration) {
+    this.finalizeDuration = Option.ofNullable(finalizeDuration);
+  }
+
+  public Option<Duration> getIndexUpdateDuration() {
+    return indexUpdateDuration;
+  }
+
+  public void setIndexUpdateDuration(Duration indexUpdateDuration) {
+    this.indexUpdateDuration = Option.ofNullable(indexUpdateDuration);
+  }
+
+  public boolean isCommitted() {
+    return isCommitted;
+  }
+
+  public void setCommitted(boolean committed) {
+    isCommitted = committed;
+  }
+
+  public Option<List<HoodieWriteStat>> getWriteStats() {
+    return writeStats;
+  }
+
+  public void setWriteStats(List<HoodieWriteStat> writeStats) {
+    this.writeStats = Option.of(writeStats);
+  }
+
+  public Option<Duration> getIndexLookupDuration() {
+    return indexLookupDuration;
+  }
+
+  public void setIndexLookupDuration(Duration indexLookupDuration) {
+    this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
index e6ac2e9..d5dcec8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
@@ -16,28 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
 
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
-
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+/**
+ * Helper class for an insert bucket along with the weight [0.0, 1.0] that defines the amount of incoming inserts that
+ * should be allocated to the bucket.
+ */
+public class InsertBucket implements Serializable {
+
+  int bucketNumber;
+  // fraction of total inserts, that should go into this bucket
+  double weight;
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("WorkloadStat {");
+    sb.append("bucketNumber=").append(bucketNumber).append(", ");
+    sb.append("weight=").append(weight);
+    sb.append('}');
+    return sb.toString();
   }
-
-  public abstract R execute();
-}
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
index e6ac2e9..d08dab2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
@@ -16,28 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class InsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public InsertCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
   }
 
-  public abstract R execute();
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
index e6ac2e9..acc9902 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class InsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private final JavaRDD<HoodieRecord<T>> preppedRecords;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public InsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return super.execute(preppedRecords);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
similarity index 55%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
index e6ac2e9..ccea6af 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/SmallFile.java
@@ -16,28 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
+import java.io.Serializable;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
-
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+/**
+ * Helper class for a small file's location and its actual size on disk.
+ */
+public class SmallFile implements Serializable {
+
+  public HoodieRecordLocation location;
+  public long sizeBytes;
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("SmallFile {");
+    sb.append("location=").append(location).append(", ");
+    sb.append("sizeBytes=").append(sizeBytes);
+    sb.append('}');
+    return sb.toString();
   }
-
-  public abstract R execute();
-}
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
index e6ac2e9..efdcae1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
@@ -16,28 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class UpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public UpsertCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
   }
 
-  public abstract R execute();
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>)table,
+        config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
new file mode 100644
index 0000000..745388c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+/**
+ * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
+ */
+public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partitioner {
+
+  private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
+
+  /**
+   * List of all small files to be corrected.
+   */
+  protected List<SmallFile> smallFiles = new ArrayList<>();
+  /**
+   * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
+   */
+  private int totalBuckets = 0;
+  /**
+   * Stat for the current workload. Helps in determining total inserts, upserts etc.
+   */
+  private WorkloadStat globalStat;
+  /**
+   * Helps decide which bucket an incoming update should go to.
+   */
+  private HashMap<String, Integer> updateLocationToBucket;
+  /**
+   * Helps us pack inserts into 1 or more buckets depending on number of incoming records.
+   */
+  private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
+  /**
+   * Remembers what type each bucket is for later.
+   */
+  private HashMap<Integer, BucketInfo> bucketInfoMap;
+
+  protected final HoodieTable<T> table;
+
+  protected final HoodieWriteConfig config;
+
+  public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
+      HoodieWriteConfig config) {
+    updateLocationToBucket = new HashMap<>();
+    partitionPathToInsertBuckets = new HashMap<>();
+    bucketInfoMap = new HashMap<>();
+    globalStat = profile.getGlobalStat();
+    this.table = table;
+    this.config = config;
+    assignUpdates(profile);
+    assignInserts(profile, jsc);
+
+    LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+        + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+        + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
+  }
+
+  private void assignUpdates(WorkloadProfile profile) {
+    // each update location gets a partition
+    Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
+    for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
+      for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
+          partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
+        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+      }
+    }
+  }
+
+  private int addUpdateBucket(String partitionPath, String fileIdHint) {
+    int bucket = totalBuckets;
+    updateLocationToBucket.put(fileIdHint, bucket);
+    BucketInfo bucketInfo = new BucketInfo();
+    bucketInfo.bucketType = BucketType.UPDATE;
+    bucketInfo.fileIdPrefix = fileIdHint;
+    bucketInfo.partitionPath = partitionPath;
+    bucketInfoMap.put(totalBuckets, bucketInfo);
+    totalBuckets++;
+    return bucket;
+  }
+
+  private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
+    // for new inserts, compute buckets depending on how many records we have for each partition
+    Set<String> partitionPaths = profile.getPartitionPaths();
+    long averageRecordSize =
+        averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+            config.getCopyOnWriteRecordSizeEstimate());
+    LOG.info("AvgRecordSize => " + averageRecordSize);
+
+    Map<String, List<SmallFile>> partitionSmallFilesMap =
+        getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
+
+    for (String partitionPath : partitionPaths) {
+      WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+      if (pStat.getNumInserts() > 0) {
+
+        List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+        this.smallFiles.addAll(smallFiles);
+
+        LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
+
+        long totalUnassignedInserts = pStat.getNumInserts();
+        List<Integer> bucketNumbers = new ArrayList<>();
+        List<Long> recordsPerBucket = new ArrayList<>();
+
+        // first try packing this into one of the smallFiles
+        for (SmallFile smallFile : smallFiles) {
+          long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
+              totalUnassignedInserts);
+          if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
+            // create a new bucket or re-use an existing bucket
+            int bucket;
+            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
+              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
+              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
+            } else {
+              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
+              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
+            }
+            bucketNumbers.add(bucket);
+            recordsPerBucket.add(recordsToAppend);
+            totalUnassignedInserts -= recordsToAppend;
+          }
+        }
+
+        // if we have anything more, create new insert buckets, like normal
+        if (totalUnassignedInserts > 0) {
+          long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
+          if (config.shouldAutoTuneInsertSplits()) {
+            insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
+          }
+
+          int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
+          LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+              + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
+          for (int b = 0; b < insertBuckets; b++) {
+            bucketNumbers.add(totalBuckets);
+            recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
+            BucketInfo bucketInfo = new BucketInfo();
+            bucketInfo.bucketType = BucketType.INSERT;
+            bucketInfo.partitionPath = partitionPath;
+            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            bucketInfoMap.put(totalBuckets, bucketInfo);
+            totalBuckets++;
+          }
+        }
+
+        // Go over all such buckets, and assign weights as per amount of incoming inserts.
+        List<InsertBucket> insertBuckets = new ArrayList<>();
+        for (int i = 0; i < bucketNumbers.size(); i++) {
+          InsertBucket bkt = new InsertBucket();
+          bkt.bucketNumber = bucketNumbers.get(i);
+          bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
+          insertBuckets.add(bkt);
+        }
+        LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
+        partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
+      }
+    }
+  }
+
+  private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
+
+    Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+    if (partitionPaths != null && partitionPaths.size() > 0) {
+      JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
+      partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
+          partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();
+    }
+
+    return partitionSmallFilesMap;
+  }
+
+  /**
+   * Returns a list of small files in the given partition path.
+   */
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+    // smallFiles only for partitionPath
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+    if (!commitTimeline.empty()) { // if we have some commits
+      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+      for (HoodieBaseFile file : allFiles) {
+        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+          String filename = file.getFileName();
+          SmallFile sf = new SmallFile();
+          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+          sf.sizeBytes = file.getFileSize();
+          smallFileLocations.add(sf);
+        }
+      }
+    }
+
+    return smallFileLocations;
+  }
+
+  public BucketInfo getBucketInfo(int bucketNumber) {
+    return bucketInfoMap.get(bucketNumber);
+  }
+
+  public List<InsertBucket> getInsertBuckets(String partitionPath) {
+    return partitionPathToInsertBuckets.get(partitionPath);
+  }
+
+  @Override
+  public int numPartitions() {
+    return totalBuckets;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
+        (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
+    if (keyLocation._2().isPresent()) {
+      HoodieRecordLocation location = keyLocation._2().get();
+      return updateLocationToBucket.get(location.getFileId());
+    } else {
+      List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
+      // pick the target bucket to use based on the weights.
+      double totalWeight = 0.0;
+      final long totalInserts = Math.max(1, globalStat.getNumInserts());
+      final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
+      final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
+      for (InsertBucket insertBucket : targetBuckets) {
+        totalWeight += insertBucket.weight;
+        if (r <= totalWeight) {
+          return insertBucket.bucketNumber;
+        }
+      }
+      // return first one, by default
+      return targetBuckets.get(0).bucketNumber;
+    }
+  }
+
+  /**
+   * Obtains the average record size based on records written during previous commits. Used for estimating how many
+   * records pack into one file.
+   */
+  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
+    long avgSize = defaultRecordSizeEstimate;
+    try {
+      if (!commitTimeline.empty()) {
+        // Go over the reverse ordered commits to get a more recent estimate of average record size.
+        Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+          if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
+            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
+            break;
+          }
+        }
+      }
+    } catch (Throwable t) {
+      // make this fail safe.
+      LOG.error("Error trying to compute average bytes/record ", t);
+    }
+    return avgSize;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
similarity index 54%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
index e6ac2e9..5999104 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class UpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private final JavaRDD<HoodieRecord<T>> preppedRecords;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public UpsertPreppedCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+    super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return super.execute(preppedRecords);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
new file mode 100644
index 0000000..7faee1c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.Duration;
+import java.time.Instant;
+import scala.Tuple2;
+
+public class WriteHelper<T extends HoodieRecordPayload<T>> {
+
+  public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
+      JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
+      HoodieTable<T> table, boolean shouldCombine,
+      int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
+    try {
+      // De-dupe/merge if needed
+      JavaRDD<HoodieRecord<T>> dedupedRecords =
+          combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table);
+
+      Instant lookupBegin = Instant.now();
+      JavaRDD<HoodieRecord<T>> taggedRecords = dedupedRecords;
+      if (performTagging) {
+        // perform index loop up to get existing location of records
+        taggedRecords = tag(dedupedRecords, jsc, table);
+      }
+      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
+
+      HoodieWriteMetadata result = executor.execute(taggedRecords);
+      result.setIndexLookupDuration(indexLookupDuration);
+      return result;
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
+    }
+  }
+
+  private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> tag(
+      JavaRDD<HoodieRecord<T>> dedupedRecords, JavaSparkContext jsc, HoodieTable<T> table) {
+    // perform index loop up to get existing location of records
+    return table.getIndex().tagLocation(dedupedRecords, jsc, table);
+  }
+
+  public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> combineOnCondition(
+      boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism, HoodieTable<T> table) {
+    return condition ? deduplicateRecords(records, table, parallelism) : records;
+  }
+
+  /**
+   * Deduplicate Hoodie records, using the given deduplication function.
+   *
+   * @param records hoodieRecords to deduplicate
+   * @param parallelism parallelism or partitions to be used while reducing/deduplicating
+   * @return RDD of HoodieRecord already be deduplicated
+   */
+  public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
+      JavaRDD<HoodieRecord<T>> records, HoodieTable<T> table, int parallelism) {
+    return deduplicateRecords(records, table.getIndex(), parallelism);
+  }
+
+  public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
+      JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism) {
+    boolean isIndexingGlobal = index.isGlobal();
+    return records.mapToPair(record -> {
+      HoodieKey hoodieKey = record.getKey();
+      // If index used is global, then records are expected to differ in their partitionPath
+      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+      return new Tuple2<>(key, record);
+    }).reduceByKey((rec1, rec2) -> {
+      @SuppressWarnings("unchecked")
+      T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+      // we cannot allow the user to change the key or partitionPath, since that will affect
+      // everything
+      // so pick it from one of the records.
+      return new HoodieRecord<T>(rec1.getKey(), reducedData);
+    }, parallelism).map(Tuple2::_2);
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..95779a7
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.hudi.table.action.commit.BulkInsertHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+  public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
+    this.bulkInsertPartitioner = bulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    try {
+      return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config,
+          this, true, bulkInsertPartitioner);
+    } catch (Throwable e) {
+      if (e instanceof HoodieInsertException) {
+        throw e;
+      }
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..7b6e146
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+import org.apache.hudi.table.action.commit.BulkInsertHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
+
+  private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
+  private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner;
+
+  public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
+      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+    super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+    this.preppedInputRecordRdd = preppedInputRecordRdd;
+    this.bulkInsertPartitioner = bulkInsertPartitioner;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    try {
+      return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config,
+          this, false, bulkInsertPartitioner);
+    } catch (Throwable e) {
+      if (e instanceof HoodieInsertException) {
+        throw e;
+      }
+      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
similarity index 50%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
index e6ac2e9..1575406 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
@@ -16,28 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
 
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaSparkContext;
-
-public abstract class BaseActionExecutor<R> {
-
-  protected final JavaSparkContext jsc;
 
-  protected final HoodieWriteConfig config;
+import org.apache.hudi.table.action.commit.DeleteHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
-  protected final HoodieTable<?> table;
+public class DeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
 
-  protected final String instantTime;
+  private final JavaRDD<HoodieKey> keys;
 
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public DeleteDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieKey> keys) {
+    super(jsc, config, table, instantTime, WriteOperationType.DELETE);
+    this.keys = keys;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return DeleteHelper.execute(instantTime, keys, jsc, config, (HoodieTable<T>)table, this);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
new file mode 100644
index 0000000..775580e
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
+import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.hudi.table.action.commit.CommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+  private static final Logger LOG = LogManager.getLogger(DeltaCommitActionExecutor.class);
+
+  // UpsertPartitioner for MergeOnRead table type
+  private UpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
+
+  public DeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, WriteOperationType operationType) {
+    super(jsc, config, table, instantTime, operationType);
+  }
+
+  @Override
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+    if (profile == null) {
+      throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
+    }
+    mergeOnReadUpsertPartitioner = new UpsertDeltaCommitPartitioner(profile, jsc, table, config);
+    return mergeOnReadUpsertPartitioner;
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
+      Iterator<HoodieRecord<T>> recordItr) throws IOException {
+    LOG.info("Merging updates for commit " + instantTime + " for file " + fileId);
+
+    if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+      LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
+      return super.handleUpdate(partitionPath, fileId, recordItr);
+    } else {
+      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, (HoodieTable<T>)table,
+          partitionPath, fileId, recordItr, sparkTaskContextSupplier);
+      appendHandle.doAppend();
+      appendHandle.close();
+      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
+    }
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
+      throws Exception {
+    // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
+    if (table.getIndex().canIndexLogFiles()) {
+      return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+          sparkTaskContextSupplier);
+    } else {
+      return super.handleInsert(idPfx, recordItr);
+    }
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..f76f7fc
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.WriteHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class InsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, false);
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
similarity index 51%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
index e6ac2e9..55031ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public abstract class BaseActionExecutor<R> {
+public class InsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
 
-  protected final JavaSparkContext jsc;
+  private final JavaRDD<HoodieRecord<T>> preppedRecords;
 
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public InsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return super.execute(preppedRecords);
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..1fdf433
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.WriteHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class UpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
+
+  private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public UpsertDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true);
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java
new file mode 100644
index 0000000..f5a4370
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitPartitioner.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.table.action.commit.UpsertPartitioner;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet files to larger ones
+ * without the need for an index in the logFile.
+ */
+public class UpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>> extends UpsertPartitioner<T> {
+
+  UpsertDeltaCommitPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
+      HoodieWriteConfig config) {
+    super(profile, jsc, table, config);
+  }
+
+  @Override
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+    // smallFiles only for partitionPath
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Init here since this class (and member variables) might not have been initialized
+    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+
+    // Find out all eligible small file slices
+    if (!commitTimeline.empty()) {
+      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      // find smallest file in partition and append to it
+      List<FileSlice> allSmallFileSlices = new ArrayList<>();
+      // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
+      // it. Doing this overtime for a partition, we ensure that we handle small file issues
+      if (!table.getIndex().canIndexLogFiles()) {
+        // TODO : choose last N small files since there can be multiple small files written to a single partition
+        // by different spark partitions in a single batch
+        Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
+            .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
+            .filter(
+                fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
+                    .getParquetSmallFileLimit())
+            .min((FileSlice left, FileSlice right) ->
+                left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
+        if (smallFileSlice.isPresent()) {
+          allSmallFileSlices.add(smallFileSlice.get());
+        }
+      } else {
+        // If we can index log files, we can add more inserts to log files for fileIds including those under
+        // pending compaction.
+        List<FileSlice> allFileSlices =
+            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
+                .collect(Collectors.toList());
+        for (FileSlice fileSlice : allFileSlices) {
+          if (isSmallFile(fileSlice)) {
+            allSmallFileSlices.add(fileSlice);
+          }
+        }
+      }
+      // Create SmallFiles from the eligible file slices
+      for (FileSlice smallFileSlice : allSmallFileSlices) {
+        SmallFile sf = new SmallFile();
+        if (smallFileSlice.getBaseFile().isPresent()) {
+          // TODO : Move logic of file name, file id, base commit time handling inside file slice
+          String filename = smallFileSlice.getBaseFile().get().getFileName();
+          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+          sf.sizeBytes = getTotalFileSize(smallFileSlice);
+          smallFileLocations.add(sf);
+        } else {
+          HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+          sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+              FSUtils.getFileIdFromLogPath(logFile.getPath()));
+          sf.sizeBytes = getTotalFileSize(smallFileSlice);
+          smallFileLocations.add(sf);
+        }
+      }
+    }
+    return smallFileLocations;
+  }
+
+  public List<String> getSmallFileIds() {
+    return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
+        .collect(Collectors.toList());
+  }
+
+  private long getTotalFileSize(FileSlice fileSlice) {
+    if (!fileSlice.getBaseFile().isPresent()) {
+      return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
+    } else {
+      return fileSlice.getBaseFile().get().getFileSize()
+          + convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
+    }
+  }
+
+  private boolean isSmallFile(FileSlice fileSlice) {
+    long totalSize = getTotalFileSize(fileSlice);
+    return totalSize < config.getParquetMaxFileSize();
+  }
+
+  // TODO (NA) : Make this static part of utility
+  public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
+    long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
+        .filter(size -> size > 0).reduce(Long::sum).orElse(0L);
+    // Here we assume that if there is no base parquet file, all log files contain only inserts.
+    // We can then just get the parquet equivalent size of these log files, compare that with
+    // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
+    return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio());
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
similarity index 51%
copy from hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
copy to hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
index e6ac2e9..413d2e2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
@@ -16,28 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action;
+package org.apache.hudi.table.action.deltacommit;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public abstract class BaseActionExecutor<R> {
+public class UpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends DeltaCommitActionExecutor<T> {
 
-  protected final JavaSparkContext jsc;
+  private final JavaRDD<HoodieRecord<T>> preppedRecords;
 
-  protected final HoodieWriteConfig config;
-
-  protected final HoodieTable<?> table;
-
-  protected final String instantTime;
-
-  public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
-    this.jsc = jsc;
-    this.config = config;
-    this.table = table;
-    this.instantTime = instantTime;
+  public UpsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc,
+      HoodieWriteConfig config, HoodieTable table,
+      String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+    super(jsc, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
   }
 
-  public abstract R execute();
+  public HoodieWriteMetadata execute() {
+    return super.execute(preppedRecords);
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index c5b52fa..ee76ed3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
@@ -222,7 +223,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
    *
    * @param records List of Hoodie records
    */
-  void assertNodupesWithinPartition(List<HoodieRecord> records) {
+  void assertNodupesWithinPartition(List<HoodieRecord<TestRawTripPayload>> records) {
     Map<String, Set<String>> partitionToKeys = new HashMap<>();
     for (HoodieRecord r : records) {
       String key = r.getRecordKey();
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 49311f7..7382b7e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.client;
 
+import java.util.HashSet;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -50,6 +52,7 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.commit.WriteHelper;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -195,7 +198,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
 
     String recordKey = UUID.randomUUID().toString();
     HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
-    HoodieRecord recordOne =
+    HoodieRecord<TestRawTripPayload> recordOne =
         new HoodieRecord(keyOne, HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime));
 
     HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
@@ -206,42 +209,51 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     HoodieRecord recordThree =
         new HoodieRecord(keyTwo, HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
 
-    JavaRDD<HoodieRecord> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+    JavaRDD<HoodieRecord<TestRawTripPayload>> records =
+        jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
 
-    // dedup should be done based on recordKey only
-    HoodieWriteClient clientWithDummyGlobalIndex = getWriteClientWithDummyIndex(true);
-    List<HoodieRecord> dedupedRecs = clientWithDummyGlobalIndex.deduplicateRecords(records, 1).collect();
+    // Global dedup should be done based on recordKey only
+    HoodieIndex index = mock(HoodieIndex.class);
+    when(index.isGlobal()).thenReturn(true);
+    List<HoodieRecord<TestRawTripPayload>> dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
     assertEquals(1, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
-    // dedup should be done based on both recordKey and partitionPath
-    HoodieWriteClient clientWithDummyNonGlobalIndex = getWriteClientWithDummyIndex(false);
-    dedupedRecs = clientWithDummyNonGlobalIndex.deduplicateRecords(records, 1).collect();
+    // non-Global dedup should be done based on both recordKey and partitionPath
+    index = mock(HoodieIndex.class);
+    when(index.isGlobal()).thenReturn(false);
+    dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
     // Perform write-action and check
+    JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
     try (HoodieWriteClient client = getHoodieWriteClient(getConfigBuilder().combineInput(true, true).build(), false);) {
       client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect();
+      List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
       assertNoWriteErrors(statuses);
       assertEquals(2, statuses.size());
-      assertNodupesWithinPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+      assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
           .collect(Collectors.toList()));
     }
   }
 
   /**
-   * Build a test Hoodie WriteClient with dummy index to configure isGlobal flag.
+   * Assert that there is no duplicate key at the partition level.
    *
-   * @param isGlobal Flag to control HoodieIndex.isGlobal
-   * @return Hoodie Write Client
-   * @throws Exception in case of error
+   * @param records List of Hoodie records
    */
-  private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) {
-    HoodieIndex index = mock(HoodieIndex.class);
-    when(index.isGlobal()).thenReturn(isGlobal);
-    return getHoodieWriteClient(getConfigBuilder().build(), false, index);
+  void assertNodupesInPartition(List<HoodieRecord> records) {
+    Map<String, Set<String>> partitionToKeys = new HashMap<>();
+    for (HoodieRecord r : records) {
+      String key = r.getRecordKey();
+      String partitionPath = r.getPartitionPath();
+      if (!partitionToKeys.containsKey(partitionPath)) {
+        partitionToKeys.put(partitionPath, new HashSet<>());
+      }
+      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+      partitionToKeys.get(partitionPath).add(key);
+    }
   }
 
   /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 8a726c7..fb66ddb 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -61,6 +61,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.After;
 import org.junit.Assert;
@@ -1346,9 +1348,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1);
 
       // initialize partitioner
-      hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc);
+      DeltaCommitActionExecutor actionExecutor = new DeleteDeltaCommitActionExecutor(jsc, cfg, hoodieTable,
+          newDeleteTime, deleteRDD);
+      actionExecutor.getUpsertPartitioner(new WorkloadProfile(deleteRDD));
       final List<List<WriteStatus>> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
-        return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator());
+        return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
       }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
 
       // Verify there are  errors because records are from multiple partitions (but handleUpdate is invoked for
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
similarity index 77%
rename from hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 7fd1be5..169ad31 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -16,34 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table;
+package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
-import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.hadoop.HoodieHiveUtil;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.io.HoodieCreateHandle;
-import org.apache.hudi.table.HoodieCopyOnWriteTable.UpsertPartitioner;
+import org.apache.hudi.table.HoodieCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,20 +65,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import scala.Tuple2;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestCopyOnWriteTable extends HoodieClientTestHarness {
+public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteTable.class);
+  private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
 
   @Before
   public void setUp() throws Exception {
-    initSparkContexts("TestCopyOnWriteTable");
+    initSparkContexts("TestCopyOnWriteActionExecutor");
     initPath();
     initMetaClient();
     initTestDataGenerator();
@@ -179,7 +174,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     GenericRecord newRecord;
     int index = 0;
     for (GenericRecord record : fileRecords) {
-      assertTrue(record.get("_row_key").toString().equals(records.get(index).getRecordKey()));
+      System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
+      assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
       index++;
     }
 
@@ -300,8 +296,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
 
     // Insert new records
+    CommitActionExecutor actionExecutor = new InsertCommitActionExecutor(jsc, config, table,
+        firstCommitTime, jsc.parallelize(records));
     List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
-      return table.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+      return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
     }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     Map<String, String> allWriteStatusMergedMetadataMap =
@@ -326,8 +324,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     // Insert new records
     final List<HoodieRecord> recs2 = records;
+    CommitActionExecutor actionExecutor = new InsertPreppedCommitActionExecutor(jsc, config, table,
+        instantTime, jsc.parallelize(recs2));
     List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
-      return table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), recs2.iterator());
+      return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator());
     }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     // TODO: check the actual files and make sure 11 records, total were written.
@@ -347,9 +347,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     // Insert new records
     final List<HoodieRecord> recs3 = records;
-
+    CommitActionExecutor newActionExecutor = new UpsertPreppedCommitActionExecutor(jsc, config, table,
+        instantTime, jsc.parallelize(recs3));
     returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
-      return table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), recs3.iterator());
+      return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator());
     }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     assertEquals(3, returnedStatuses.size());
@@ -361,7 +362,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath());
     assertEquals(1, returnedStatuses.get(2).getTotalRecords());
-
   }
 
   @Test
@@ -382,8 +382,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     }
 
     // Insert new records
+    CommitActionExecutor actionExecutor = new UpsertCommitActionExecutor(jsc, config, table,
+        instantTime, jsc.parallelize(records));
     jsc.parallelize(Arrays.asList(1))
-        .map(i -> table.handleInsert(instantTime, FSUtils.createNewFileIdPfx(), records.iterator()))
+        .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
         .map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
 
     // Check the updated file
@@ -397,83 +399,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     assertEquals("If the number of records are more than 1150, then there should be a new file", 3, counts);
   }
 
-  private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
-      String testPartitionPath, boolean autoSplitInserts) throws Exception {
-    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
-            .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
-
-    HoodieClientTestUtils.fakeCommitFile(basePath, "001");
-    HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
-
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
-    List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
-    List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", numUpdates);
-    for (HoodieRecord updateRec : updateRecords) {
-      updateRec.unseal();
-      updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
-      updateRec.seal();
-    }
-    List<HoodieRecord> records = new ArrayList<>();
-    records.addAll(insertRecords);
-    records.addAll(updateRecords);
-    WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
-    HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
-        (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile, jsc);
-    assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition(
-        new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
-    return partitioner;
-  }
-
-  @Test
-  public void testUpsertPartitioner() throws Exception {
-    final String testPartitionPath = "2016/09/26";
-    // Inserts + Updates... Check all updates go together & inserts subsplit
-    UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
-    List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
-    assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
-  }
-
-  @Test
-  public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
-    final String testPartitionPath = "2016/09/26";
-    // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
-    // smallest file
-    UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
-    List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
-
-    assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
-    assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
-        partitioner.getBucketInfo(0).bucketType);
-    assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
-        partitioner.getBucketInfo(1).bucketType);
-    assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
-        partitioner.getBucketInfo(2).bucketType);
-    assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
-    assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
-    assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
-
-    // Now with insert split size auto tuned
-    partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
-    insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
-
-    assertEquals("Should have 4 partitions", 4, partitioner.numPartitions());
-    assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
-        partitioner.getBucketInfo(0).bucketType);
-    assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
-        partitioner.getBucketInfo(1).bucketType);
-    assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
-        partitioner.getBucketInfo(2).bucketType);
-    assertEquals("Bucket 3 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
-        partitioner.getBucketInfo(3).bucketType);
-    assertEquals("Total of 4 insert buckets", 4, insertBuckets.size());
-    assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
-    assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
-  }
-
   @Test
   public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder()
@@ -483,8 +408,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     String instantTime = "000";
     // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
     final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
+    CommitActionExecutor actionExecutor = new InsertCommitActionExecutor(jsc, config, table,
+        instantTime, jsc.parallelize(inserts));
     final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
-      return table.handleInsert(instantTime, UUID.randomUUID().toString(), inserts.iterator());
+      return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
     }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
 
     WriteStatus writeStatus = ws.get(0).get(0);
@@ -494,8 +421,10 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     String partitionPath = updates.get(0).getPartitionPath();
     long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
+    CommitActionExecutor newActionExecutor = new UpsertCommitActionExecutor(jsc, config, table,
+        instantTime, jsc.parallelize(updates));
     final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
-      return table.handleUpdate(instantTime, partitionPath, fileId, updates.iterator());
+      return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
     }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
     assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
new file mode 100644
index 0000000..818d765
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hudi.common.HoodieClientTestHarness;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieCopyOnWriteTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Tuple2;
+
+public class TestUpsertPartitioner extends HoodieClientTestHarness {
+
+  private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class);
+
+  @Before
+  public void setUp() throws Exception {
+    initSparkContexts("TestUpsertPartitioner");
+    initPath();
+    initMetaClient();
+    initTestDataGenerator();
+    initFileSystem();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cleanupSparkContexts();
+    cleanupMetaClient();
+    cleanupFileSystem();
+    cleanupTestDataGenerator();
+  }
+
+  private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
+      String testPartitionPath, boolean autoSplitInserts) throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize)
+            .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
+
+    HoodieClientTestUtils.fakeCommitFile(basePath, "001");
+    HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
+
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
+    List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", numUpdates);
+    for (HoodieRecord updateRec : updateRecords) {
+      updateRec.unseal();
+      updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
+      updateRec.seal();
+    }
+    List<HoodieRecord> records = new ArrayList<>();
+    records.addAll(insertRecords);
+    records.addAll(updateRecords);
+    WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
+    UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
+    assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition(
+        new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
+    return partitioner;
+  }
+
+  @Test
+  public void testUpsertPartitioner() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts + Updates... Check all updates go together & inserts subsplit
+    UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
+    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+    assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
+  }
+
+  @Test
+  public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
+    // smallest file
+    UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
+    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
+    assertEquals("Bucket 0 is UPDATE", BucketType.UPDATE,
+        partitioner.getBucketInfo(0).bucketType);
+    assertEquals("Bucket 1 is INSERT", BucketType.INSERT,
+        partitioner.getBucketInfo(1).bucketType);
+    assertEquals("Bucket 2 is INSERT", BucketType.INSERT,
+        partitioner.getBucketInfo(2).bucketType);
+    assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
+    assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
+    assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
+
+    // Now with insert split size auto tuned
+    partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
+    insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+    assertEquals("Should have 4 partitions", 4, partitioner.numPartitions());
+    assertEquals("Bucket 0 is UPDATE", BucketType.UPDATE,
+        partitioner.getBucketInfo(0).bucketType);
+    assertEquals("Bucket 1 is INSERT", BucketType.INSERT,
+        partitioner.getBucketInfo(1).bucketType);
+    assertEquals("Bucket 2 is INSERT", BucketType.INSERT,
+        partitioner.getBucketInfo(2).bucketType);
+    assertEquals("Bucket 3 is INSERT", BucketType.INSERT,
+        partitioner.getBucketInfo(3).bucketType);
+    assertEquals("Total of 4 insert buckets", 4, insertBuckets.size());
+    assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
+    assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
+  }
+
+  private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception {
+    // Prepare the AvroParquetIO
+    String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 868af20..23ba7f7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -69,4 +69,8 @@ public enum WriteOperationType {
         throw new HoodieException("Invalid value of Type.");
     }
   }
+
+  public static boolean isChangingRecords(WriteOperationType operationType) {
+    return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE;
+  }
 }
\ No newline at end of file