You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/03/17 11:18:26 UTC

[hudi] branch master updated: [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7446ff9  [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)
7446ff9 is described below

commit 7446ff95a7b099d8016b7ca48fdeed9b290d710f
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Thu Mar 17 19:17:56 2022 +0800

    [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)
    
    - Adopt HoodieData in Spark action commit executors
    - Make Spark independent DeleteHelper, WriteHelper, MergeHelper in hudi-client-common
    - Make HoodieTable in WriteClient APIs have raw type to decouple with Client's generic types
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  41 ++---
 .../java/org/apache/hudi/table/HoodieTable.java    |   4 +-
 .../action/cluster/strategy/UpdateStrategy.java    |   2 +-
 .../table/action/commit/BaseBulkInsertHelper.java  |   4 +-
 .../table/action/commit/HoodieDeleteHelper.java}   |  68 ++++----
 .../table/action/commit/HoodieMergeHelper.java}    |  21 ++-
 .../table/action/commit/HoodieWriteHelper.java}    |  40 ++---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  24 +--
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  21 +--
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   4 +-
 .../hudi/table/HoodieJavaMergeOnReadTable.java     |   2 +-
 .../commit/JavaBulkInsertCommitActionExecutor.java |   6 +-
 .../table/action/commit/JavaBulkInsertHelper.java  |   5 +-
 .../JavaBulkInsertPreppedCommitActionExecutor.java |   4 +-
 .../org/apache/hudi/client/HoodieReadClient.java   |   2 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    | 111 ++++++------
 .../MultipleSparkJobExecutionStrategy.java         |  33 ++--
 .../SparkSingleFileSortExecutionStrategy.java      |   6 +-
 .../SparkSortAndSizeExecutionStrategy.java         |   6 +-
 .../update/strategy/BaseSparkUpdateStrategy.java   |   9 +-
 .../update/strategy/SparkAllowUpdateStrategy.java  |   5 +-
 .../update/strategy/SparkRejectUpdateStrategy.java |   4 +-
 .../hudi/client/utils/SparkValidatorUtils.java     |  24 +--
 .../client/validator/SparkPreCommitValidator.java  |   7 +-
 .../SqlQueryEqualityPreCommitValidator.java        |   4 +-
 .../SqlQueryInequalityPreCommitValidator.java      |   4 +-
 .../validator/SqlQueryPreCommitValidator.java      |   5 +-
 .../SqlQuerySingleResultPreCommitValidator.java    |   4 +-
 .../org/apache/hudi/data/HoodieJavaPairRDD.java    |   6 +
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  22 +++
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  56 +++---
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |  54 +++---
 .../org/apache/hudi/table/HoodieSparkTable.java    |  13 +-
 .../SparkBootstrapCommitActionExecutor.java        |  53 +++---
 .../SparkBootstrapDeltaCommitActionExecutor.java   |   8 +-
 ...SparkExecuteClusteringCommitActionExecutor.java |   9 +-
 .../commit/BaseSparkCommitActionExecutor.java      | 105 +++++------
 .../SparkBulkInsertCommitActionExecutor.java       |  17 +-
 .../table/action/commit/SparkBulkInsertHelper.java |  59 ++++---
 ...SparkBulkInsertPreppedCommitActionExecutor.java |  13 +-
 .../commit/SparkDeleteCommitActionExecutor.java    |  11 +-
 .../SparkDeletePartitionCommitActionExecutor.java  |  19 +-
 .../commit/SparkInsertCommitActionExecutor.java    |  11 +-
 .../SparkInsertOverwriteCommitActionExecutor.java  |  25 +--
 ...rkInsertOverwriteTableCommitActionExecutor.java |  26 ++-
 .../SparkInsertPreppedCommitActionExecutor.java    |   9 +-
 .../commit/SparkUpsertCommitActionExecutor.java    |  11 +-
 .../SparkUpsertPreppedCommitActionExecutor.java    |   9 +-
 .../HoodieSparkMergeOnReadTableCompactor.java      |   8 +-
 .../BaseSparkDeltaCommitActionExecutor.java        |   8 +-
 .../SparkBulkInsertDeltaCommitActionExecutor.java  |  17 +-
 ...BulkInsertPreppedDeltaCommitActionExecutor.java |  13 +-
 .../SparkDeleteDeltaCommitActionExecutor.java      |  13 +-
 .../SparkInsertDeltaCommitActionExecutor.java      |  13 +-
 ...parkInsertPreppedDeltaCommitActionExecutor.java |   9 +-
 .../SparkUpsertDeltaCommitActionExecutor.java      |  14 +-
 ...parkUpsertPreppedDeltaCommitActionExecutor.java |   9 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  30 +++-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |   3 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |  18 +-
 .../hudi/table/action/commit/TestDeleteHelper.java | 194 ---------------------
 .../table/action/compact/TestHoodieCompactor.java  |   5 +-
 .../org/apache/hudi/common/data/HoodieData.java    |   8 +
 .../org/apache/hudi/common/data/HoodieList.java    |  26 +++
 .../org/apache/hudi/common/data/HoodieMapPair.java |  11 ++
 .../apache/hudi/common/data/HoodiePairData.java    |   3 +
 .../apache/hudi/common/data/TestHoodieList.java    |  50 ++++++
 .../apache/hudi/common/data/TestHoodieMapPair.java |  29 +++
 69 files changed, 723 insertions(+), 769 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9bad2e3..a6a7e18 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.async.AsyncArchiveService;
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -78,6 +76,9 @@ import org.apache.hudi.table.action.savepoint.SavepointHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -242,11 +243,11 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
         Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
   }
 
-  protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
     return createTable(config, hadoopConf, false);
   }
 
-  protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
+  protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
 
   void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
     try {
@@ -397,7 +398,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @return Collection of WriteStatus to inspect errors and counts
    */
   public abstract O bulkInsert(I records, final String instantTime,
-                               Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
+                               Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
 
 
   /**
@@ -417,7 +418,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @return Collection of WriteStatus to inspect errors and counts
    */
   public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
-                                             Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+                                             Option<BulkInsertPartitioner> bulkInsertPartitioner);
 
   /**
    * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -458,7 +459,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param hoodieTable Hoodie Table
    * @return Write Status
    */
-  protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable<T, I, K, O> hoodieTable);
+  protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable);
 
   /**
    * Post Commit Hook. Derived classes use this method to perform post-commit processing
@@ -468,7 +469,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param instantTime   Instant Time
    * @param extraMetadata Additional Metadata passed by user
    */
-  protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
@@ -480,7 +481,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     }
   }
 
-  protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
     if (!tableServicesEnabled(config)) {
       return;
     }
@@ -524,7 +525,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     }
   }
 
-  protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
+  protected void runAnyPendingCompactions(HoodieTable table) {
     table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
         .forEach(instant -> {
           LOG.info("Running previously failed inflight compaction at instant " + instant);
@@ -532,7 +533,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
         });
   }
 
-  protected void runAnyPendingClustering(HoodieTable<T, I, K, O> table) {
+  protected void runAnyPendingClustering(HoodieTable table) {
     table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
       Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
       if (instantPlan.isPresent()) {
@@ -558,7 +559,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     }
   }
 
-  protected void autoArchiveOnCommit(HoodieTable<T, I, K, O> table) {
+  protected void autoArchiveOnCommit(HoodieTable table) {
     if (!config.isAutoArchive()) {
       return;
     }
@@ -808,7 +809,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * and keep increasing unbounded over time.
    * @param table table to commit on.
    */
-  protected void archive(HoodieTable<T, I, K, O> table) {
+  protected void archive(HoodieTable table) {
     if (!tableServicesEnabled(config)) {
       return;
     }
@@ -937,7 +938,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
   /**
    * Commit Compaction and track metrics.
    */
-  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String compactionCommitTime);
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
 
   /**
    * Get inflight time line exclude compaction and clustering.
@@ -1223,7 +1224,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     return scheduleClustering(extraMetadata);
   }
 
-  protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
+  protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
     Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
     String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
     table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
@@ -1238,7 +1239,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param instantTime Instant Time
    * @param stats Hoodie Write Stat
    */
-  protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
     try {
       final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
       table.finalizeWrite(context, instantTime, stats);
@@ -1273,7 +1274,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param instantTime current inflight instant time
    * @return instantiated {@link HoodieTable}
    */
-  protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
+  protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
 
   /**
    * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
@@ -1288,14 +1289,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    *   <li>Initializing metrics contexts</li>
    * </ul>
    */
-  protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
+  protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     // Setup write schemas for deletes
     if (operationType == WriteOperationType.DELETE) {
       setWriteSchemaForDeletes(metaClient);
     }
 
-    HoodieTable<T, I, K, O> table;
+    HoodieTable table;
 
     this.txnManager.beginTransaction();
     try {
@@ -1381,7 +1382,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     this.txnManager.close();
   }
 
-  private void setWriteTimer(HoodieTable<T, I, K, O> table) {
+  private void setWriteTimer(HoodieTable table) {
     String commitType = table.getMetaClient().getCommitActionType();
     if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
       writeTimer = metrics.getCommitCtx();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index e5262ad..62a4f08 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -180,7 +180,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
    * @return HoodieWriteMetadata
    */
   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
-      I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+      I records, Option<BulkInsertPartitioner> bulkInsertPartitioner);
 
   /**
    * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -237,7 +237,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
    * @return HoodieWriteMetadata
    */
   public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
-      I preppedRecords,  Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+      I preppedRecords,  Option<BulkInsertPartitioner> bulkInsertPartitioner);
 
   /**
    * Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index 0097908..4e33eb0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * When file groups in clustering, write records to these file group need to check.
  */
-public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
+public abstract class UpdateStrategy<T extends HoodieRecordPayload, I> {
 
   protected final HoodieEngineContext engineContext;
   protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index dffd926..ad2145c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
   public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
                                                     HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
                                                     BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
-                                                    Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
+                                                    Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
 
   /**
    * Only write input records. Does not change timeline/index. Return information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
   public abstract O bulkInsert(I inputRecords, String instantTime,
                                HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
                                boolean performDedupe,
-                               Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner,
+                               Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
                                boolean addMetadataFields,
                                int parallelism,
                                WriteHandleFactory writeHandleFactory);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
similarity index 53%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 381c115..fff52eb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -7,19 +7,20 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.  
  */
 
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -28,16 +29,12 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -48,69 +45,64 @@ import java.util.HashMap;
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
-    BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
-  private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
+    BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+  private HoodieDeleteHelper() {
   }
 
   private static class DeleteHelperHolder {
-    private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();
+    private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
   }
 
-  public static SparkDeleteHelper newInstance() {
-    return DeleteHelperHolder.SPARK_DELETE_HELPER;
+  public static HoodieDeleteHelper newInstance() {
+    return DeleteHelperHolder.HOODIE_DELETE_HELPER;
   }
 
   @Override
-  public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
+  public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
     boolean isIndexingGlobal = table.getIndex().isGlobal();
     if (isIndexingGlobal) {
-      return keys.keyBy(HoodieKey::getRecordKey)
-          .reduceByKey((key1, key2) -> key1, parallelism)
-          .values();
+      return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
     } else {
       return keys.distinct(parallelism);
     }
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime,
-                                                           JavaRDD<HoodieKey> keys,
-                                                           HoodieEngineContext context,
-                                                           HoodieWriteConfig config,
-                                                           HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                           BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> deleteExecutor) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
+                                                              HoodieData<HoodieKey> keys,
+                                                              HoodieEngineContext context,
+                                                              HoodieWriteConfig config,
+                                                              HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+                                                              BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> deleteExecutor) {
     try {
-      HoodieWriteMetadata result = null;
-      JavaRDD<HoodieKey> dedupedKeys = keys;
+      HoodieData<HoodieKey> dedupedKeys = keys;
       final int parallelism = config.getDeleteShuffleParallelism();
       if (config.shouldCombineBeforeDelete()) {
         // De-dupe/merge if needed
         dedupedKeys = deduplicateKeys(keys, table, parallelism);
-      } else if (!keys.partitions().isEmpty()) {
+      } else if (!keys.isEmpty()) {
         dedupedKeys = keys.repartition(parallelism);
       }
 
-      JavaRDD<HoodieRecord<T>> dedupedRecords =
+      HoodieData<HoodieRecord<T>> dedupedRecords =
           dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
       Instant beginTag = Instant.now();
       // perform index loop up to get existing location of records
-      JavaRDD<HoodieRecord<T>> taggedRecords = HoodieJavaRDD.getJavaRDD(
-          table.getIndex().tagLocation(HoodieJavaRDD.of(dedupedRecords), context, table));
+      HoodieData<HoodieRecord<T>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
       Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
 
       // filter out non existent keys/records
-      JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+      HoodieData<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+      HoodieWriteMetadata<HoodieData<WriteStatus>> result;
       if (!taggedValidRecords.isEmpty()) {
         result = deleteExecutor.execute(taggedValidRecords);
         result.setIndexLookupDuration(tagLocationDuration);
       } else {
         // if entire set of keys are non existent
         deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
-        result = new HoodieWriteMetadata();
-        result.setWriteStatuses(jsc.emptyRDD());
+        result = new HoodieWriteMetadata<>();
+        result.setWriteStatuses(context.emptyHoodieData());
         deleteExecutor.commitOnAutoCommit(result);
       }
       return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
similarity index 81%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index e87c3ef..2b4a5d1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,31 +38,29 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.Iterator;
 
-public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, JavaRDD<HoodieRecord<T>>,
-    JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
+    BaseMergeHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
 
-  private SparkMergeHelper() {
+  private HoodieMergeHelper() {
   }
 
   private static class MergeHelperHolder {
-    private static final SparkMergeHelper SPARK_MERGE_HELPER = new SparkMergeHelper();
+    private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper<>();
   }
 
-  public static SparkMergeHelper newInstance() {
-    return SparkMergeHelper.MergeHelperHolder.SPARK_MERGE_HELPER;
+  public static HoodieMergeHelper newInstance() {
+    return MergeHelperHolder.HOODIE_MERGE_HELPER;
   }
 
   @Override
-  public void runMerge(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                       HoodieMergeHandle<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> upsertHandle) throws IOException {
+  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
     final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
     Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
-    HoodieMergeHandle<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> mergeHandle = upsertHandle;
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
     final GenericDatumWriter<GenericRecord> gWriter;
@@ -78,7 +77,7 @@ public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe
     }
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
+    HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
     try {
       final Iterator<GenericRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
similarity index 60%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 23dceb1..b56d39b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -19,60 +19,52 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaRDD;
+public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
+    HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
 
-import scala.Tuple2;
-
-/**
- * A spark implementation of {@link BaseWriteHelper}.
- *
- * @param <T>
- */
-public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, JavaRDD<HoodieRecord<T>>,
-    JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
-  private SparkWriteHelper() {
+  private HoodieWriteHelper() {
   }
 
   private static class WriteHelperHolder {
-    private static final SparkWriteHelper SPARK_WRITE_HELPER = new SparkWriteHelper();
+    private static final HoodieWriteHelper HOODIE_WRITE_HELPER = new HoodieWriteHelper<>();
   }
 
-  public static SparkWriteHelper newInstance() {
-    return WriteHelperHolder.SPARK_WRITE_HELPER;
+  public static HoodieWriteHelper newInstance() {
+    return WriteHelperHolder.HOODIE_WRITE_HELPER;
   }
 
   @Override
-  protected JavaRDD<HoodieRecord<T>> tag(JavaRDD<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context,
-                                         HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table) {
-    return HoodieJavaRDD.getJavaRDD(
-        table.getIndex().tagLocation(HoodieJavaRDD.of(dedupedRecords), context, table));
+  protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context,
+                                            HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table) {
+    return table.getIndex().tagLocation(dedupedRecords, context, table);
   }
 
   @Override
-  public JavaRDD<HoodieRecord<T>> deduplicateRecords(
-      JavaRDD<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+  public HoodieData<HoodieRecord<T>> deduplicateRecords(
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
     boolean isIndexingGlobal = index.isGlobal();
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their partitionPath
       Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
-      return new Tuple2<>(key, record);
+      return Pair.of(key, record);
     }).reduceByKey((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
       T reducedData = (T) rec2.getData().preCombine(rec1.getData());
       HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
 
-      return new HoodieAvroRecord<T>(reducedKey, reducedData);
-    }, parallelism).map(Tuple2::_2);
+      return new HoodieAvroRecord<>(reducedKey, reducedData);
+    }, parallelism).map(Pair::getRight);
   }
 
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index fb61330..4523705 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieList;
@@ -55,6 +53,7 @@ import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
 import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -64,6 +63,9 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.util.FlinkClientUtil;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,8 +113,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf,
-                                                                                                  boolean refreshTimeline) {
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf,
+                                    boolean refreshTimeline) {
     return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
   }
 
@@ -226,12 +228,12 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+  public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
   }
 
   @Override
-  public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+  public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
   }
 
@@ -304,7 +306,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   @Override
   protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
                                         String instantTime,
-                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+                                        HoodieTable hoodieTable) {
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
     }
@@ -324,7 +326,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
    * @param extraMetadata Additional Metadata passed by user
    */
   @Override
-  protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+  protected void postCommit(HoodieTable table,
                             HoodieCommitMetadata metadata,
                             String instantTime,
                             Option<Map<String, String>> extraMetadata) {
@@ -351,7 +353,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public void completeCompaction(
       HoodieCommitMetadata metadata,
-      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+      HoodieTable table,
       String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = metadata.getWriteStats();
@@ -363,7 +365,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
       // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
       table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
-          w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
+          w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
       LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
     } finally {
@@ -396,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
     // Create a Hoodie table which encapsulated the commits and files visible
     return getHoodieTable();
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 7e41ab1..14937d6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -68,6 +68,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
@@ -231,7 +232,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
   public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext context,
                                                            String instantTime,
                                                            List<HoodieRecord<T>> records,
-                                                           Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                           Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     throw new HoodieNotSupportedException("BulkInsert is not supported yet");
   }
 
@@ -264,7 +265,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
   public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
                                                                   String instantTime,
                                                                   List<HoodieRecord<T>> preppedRecords,
-                                                                  Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                                  Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet");
   }
 
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 9de9298..faf46e0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.data.HoodieList;
@@ -43,6 +41,9 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.upgrade.JavaUpgradeDowngradeHelper;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -88,9 +89,9 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config,
-                                                                                                  Configuration hadoopConf,
-                                                                                                  boolean refreshTimeline) {
+  protected HoodieTable createTable(HoodieWriteConfig config,
+                                    Configuration hadoopConf,
+                                    boolean refreshTimeline) {
     return HoodieJavaTable.create(config, context);
   }
 
@@ -152,7 +153,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
                                       String instantTime,
-                                      Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+                                      Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
   }
 
@@ -166,7 +167,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
                                                     String instantTime,
-                                                    Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                    Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
     table.validateInsertSchema();
@@ -188,7 +189,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   @Override
   protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
                                         String instantTime,
-                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+                                        HoodieTable hoodieTable) {
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
     }
@@ -215,7 +216,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
 
   @Override
   protected void completeCompaction(HoodieCommitMetadata metadata,
-                                    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                    HoodieTable table,
                                     String compactionCommitTime) {
     throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient");
   }
@@ -232,7 +233,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
     // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
 
     // Create a Hoodie table which encapsulated the commits and files visible
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 447ed3e..06c2304 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -114,7 +114,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
   public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext context,
                                                            String instantTime,
                                                            List<HoodieRecord<T>> records,
-                                                           Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                           Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     return new JavaBulkInsertCommitActionExecutor((HoodieJavaEngineContext) context, config,
         this, instantTime, records, bulkInsertPartitioner).execute();
   }
@@ -152,7 +152,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
   public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
                                                                   String instantTime,
                                                                   List<HoodieRecord<T>> preppedRecords,
-                                                                  Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                                  Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
         this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 136c25b..32d30f7 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -61,7 +61,7 @@ public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends H
   public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
                                                                   String instantTime,
                                                                   List<HoodieRecord<T>> preppedRecords,
-                                                                  Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                                  Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
         this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
index 9780262..d5c7a0b 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
@@ -36,17 +36,17 @@ import java.util.Map;
 public class JavaBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
 
   private final List<HoodieRecord<T>> inputRecords;
-  private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
 
   public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
                                             String instantTime, List<HoodieRecord<T>> inputRecords,
-                                            Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                            Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     this(context, config, table, instantTime, inputRecords, bulkInsertPartitioner, Option.empty());
   }
 
   public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
                                             String instantTime, List<HoodieRecord<T>> inputRecords,
-                                            Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner,
+                                            Option<BulkInsertPartitioner> bulkInsertPartitioner,
                                             Option<Map<String, String>> extraMetadata) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
     this.inputRecords = inputRecords;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index de7afdf..30f1d93 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -65,7 +65,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
                                                            final HoodieWriteConfig config,
                                                            final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor,
                                                            final boolean performDedupe,
-                                                           final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+                                                           final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
     // It's possible the transition to inflight could have already happened.
@@ -89,7 +89,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
                                       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
                                       HoodieWriteConfig config,
                                       boolean performDedupe,
-                                      Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
+                                      Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
                                       boolean useWriterSchema,
                                       int parallelism,
                                       WriteHandleFactory writeHandleFactory) {
@@ -106,6 +106,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
     BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
         ? userDefinedBulkInsertPartitioner.get()
         : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
+    // only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
     repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
 
     FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
index ed72fbe..14c4c8a 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
@@ -36,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPay
     extends BaseJavaCommitActionExecutor<T> {
 
   private final List<HoodieRecord<T>> preppedInputRecord;
-  private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
 
   public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
                                                    HoodieWriteConfig config, HoodieTable table,
                                                    String instantTime, List<HoodieRecord<T>> preppedInputRecord,
-                                                   Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+                                                   Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecord = preppedInputRecord;
     this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index e9bdc42..37a78a4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -68,7 +68,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements Seria
    * base path pointing to the table. Until, then just always assume a BloomIndex
    */
   private final transient HoodieIndex<?, ?> index;
-  private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable;
+  private HoodieTable hoodieTable;
   private transient Option<SQLContext> sqlContextOpt;
   private final transient HoodieSparkEngineContext context;
   private final transient Configuration hadoopConf;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ec1ecd6..ac9259c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,11 +18,10 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.TransactionUtils;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.metrics.Registry;
@@ -45,6 +44,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -54,6 +54,9 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
@@ -120,9 +123,9 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createTable(HoodieWriteConfig config,
-                                                                                                           Configuration hadoopConf,
-                                                                                                           boolean refreshTimeline) {
+  protected HoodieTable createTable(HoodieWriteConfig config,
+                                    Configuration hadoopConf,
+                                    boolean refreshTimeline) {
     return HoodieSparkTable.create(config, context, refreshTimeline);
   }
 
@@ -147,45 +150,49 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
 
   @Override
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
     }
-    return postWrite(result, instantTime, table);
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insert(context,instantTime, HoodieJavaRDD.of(records));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   /**
@@ -196,11 +203,12 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
    */
   public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
-    HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
-    HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
-    return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertOverwrite(context, instantTime, HoodieJavaRDD.of(records));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
   }
 
   /**
@@ -211,11 +219,12 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
    */
   public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
-    HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
-    HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
-    return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertOverwriteTable(context, instantTime, HoodieJavaRDD.of(records));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
   }
 
   @Override
@@ -224,44 +233,48 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.bulkInsert(context,instantTime, HoodieJavaRDD.of(records), userDefinedBulkInsertPartitioner);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
-  public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+  public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords), bulkInsertPartitioner);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.delete(context,instantTime, keys);
-    return postWrite(result, instantTime, table);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.delete(context,instantTime, HoodieJavaRDD.of(keys));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return postWrite(resultRDD, instantTime, table);
   }
 
   public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
-    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
-    return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
   }
 
   @Override
   protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result,
                                            String instantTime,
-                                           HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
+                                           HoodieTable hoodieTable) {
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
     }
@@ -288,7 +301,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
 
   @Override
   protected void completeCompaction(HoodieCommitMetadata metadata,
-                                    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+                                    HoodieTable table,
                                     String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = metadata.getWriteStats();
@@ -329,8 +342,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
       table.getMetaClient().reloadActiveTimeline();
     }
     compactionTimer = metrics.getCompactionCtx();
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
-        table.compact(context, compactionInstantTime);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
     if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
       completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
     }
@@ -349,7 +362,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     }
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at " + clusteringInstant);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
     // TODO : Where is shouldComplete used ?
     if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
       completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
@@ -358,9 +372,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   private void completeClustering(HoodieReplaceCommitMetadata metadata,
-                                    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                    String clusteringCommitTime) {
-
+                                  HoodieTable table,
+                                  String clusteringCommitTime) {
     List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
         e.getValue().stream()).collect(Collectors.toList());
 
@@ -405,17 +418,17 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     LOG.info("Clustering successfully on commit " + clusteringCommitTime);
   }
 
-  private void updateTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
+  private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
                                    HoodieInstant hoodieInstant) {
     boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
     // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
     // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
     table.getMetadataWriter(hoodieInstant.getTimestamp())
-        .ifPresent(writer -> writer.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+        .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
   }
 
   @Override
-  protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
     // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
     // if it didn't exist before
     // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
@@ -440,7 +453,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
 
   // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
   private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
-                                    HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+                                    HoodieTable table,
                                     String commitInstant) {
 
     switch (tableServiceType) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 54c1c9f..5a03cdf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -62,6 +58,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -94,7 +95,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
     JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
     // execute clustering for each group async and collect WriteStatus
-    Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(
+    Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
         clusteringPlan.getInputGroups().stream()
         .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
             clusteringPlan.getStrategy().getStrategyParams(),
@@ -103,7 +104,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
             .collect(Collectors.toList()))
         .join()
         .stream();
-    JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
+    JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
     JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
 
     HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
@@ -125,7 +126,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
    * @param preserveHoodieMetadata Whether to preserve commit metadata while clustering.
    * @return RDD of {@link WriteStatus}.
    */
-  public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
+  public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
                                                                        final Map<String, String> strategyParams, final Schema schema,
                                                                        final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
 
@@ -164,11 +165,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
   /**
    * Submit job to execute clustering for the group.
    */
-  private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
+  private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
                                                                              boolean preserveHoodieMetadata, String instantTime) {
     return CompletableFuture.supplyAsync(() -> {
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
-      JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
+      HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
       Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
       List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
           .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
@@ -180,7 +181,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
   /**
    * Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
    */
-  private JavaRDD<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
+  private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
     List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
     boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
     if (hasLogFiles) {
@@ -195,12 +196,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
   /**
    * Read records from baseFiles, apply updates and convert to RDD.
    */
-  private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
+  private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
                                                                List<ClusteringOperation> clusteringOps,
                                                                String instantTime) {
     HoodieWriteConfig config = getWriteConfig();
     HoodieTable table = getHoodieTable();
-    return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
+    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
       List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
       clusteringOpsPartition.forEachRemaining(clusteringOp -> {
         long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
@@ -237,20 +238,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
       });
 
       return new ConcatenatingIterator<>(recordIterators);
-    });
+    }));
   }
 
   /**
    * Read records from baseFiles and convert to RDD.
    */
-  private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
+  private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
                                                                 List<ClusteringOperation> clusteringOps) {
     SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
     HoodieWriteConfig writeConfig = getWriteConfig();
 
     // NOTE: It's crucial to make sure that we don't capture whole "this" object into the
     //       closure, as this might lead to issues attempting to serialize its nested fields
-    return jsc.parallelize(clusteringOps, clusteringOps.size())
+    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size())
         .mapPartitions(clusteringOpsPartition -> {
           List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
           clusteringOpsPartition.forEachRemaining(clusteringOp -> {
@@ -266,7 +267,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
 
           return new ConcatenatingIterator<>(iteratorsForPartition);
         })
-        .map(record -> transform(record, writeConfig));
+        .map(record -> transform(record, writeConfig)));
   }
 
   /**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 2a50393..4dedaba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -34,7 +35,6 @@ import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 import org.apache.avro.Schema;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
 import java.util.Map;
@@ -56,7 +56,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
   }
 
   @Override
-  public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(JavaRDD<HoodieRecord<T>> inputRecords,
+  public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords,
                                                               int numOutputGroups,
                                                               String instantTime,
                                                               Map<String, String> strategyParams,
@@ -74,7 +74,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
     // Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
     props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE));
     HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
-    return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
+    return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
         false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 22d5300..d664c83 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -32,7 +33,6 @@ import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 import org.apache.avro.Schema;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
 import java.util.Map;
@@ -54,7 +54,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
   }
 
   @Override
-  public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
+  public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups,
                                                               final String instantTime, final Map<String, String> strategyParams, final Schema schema,
                                                               final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
     LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
@@ -64,7 +64,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
     props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
     props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
     HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
-    return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance()
+    return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
         .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
index 655c119..3eadba2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
@@ -19,13 +19,12 @@
 package org.apache.hudi.client.clustering.update.strategy;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.util.List;
 import java.util.Set;
 
@@ -33,7 +32,7 @@ import java.util.Set;
  * Spark base update strategy, write records to the file groups which are in clustering
  * need to check. Spark relate implementations should extend this base class.
  */
-public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
+public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
 
   public BaseSparkUpdateStrategy(HoodieSparkEngineContext engineContext,
                                  Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
@@ -45,9 +44,9 @@ public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>>
    * @param inputRecords the records to write, tagged with target file id
    * @return the records matched file group ids
    */
-  protected List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
+  protected List<HoodieFileGroupId> getGroupIdsWithUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
     return inputRecords
             .filter(record -> record.getCurrentLocation() != null)
-            .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
+            .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collectAsList();
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
index 92a5fb6..5904062 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
@@ -19,13 +19,12 @@
 package org.apache.hudi.client.clustering.update.strategy;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -42,7 +41,7 @@ public class SparkAllowUpdateStrategy<T extends HoodieRecordPayload<T>> extends
   }
 
   @Override
-  public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
+  public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
     List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
     Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream()
         .filter(f -> fileGroupsInPendingClustering.contains(f))
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
index ac058a4..d09422e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.clustering.update.strategy;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -27,7 +28,6 @@ import org.apache.hudi.exception.HoodieClusteringUpdateException;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -47,7 +47,7 @@ public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends
   }
 
   @Override
-  public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
+  public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
     List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
     fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
       if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 9e72390..fd083f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -21,7 +21,9 @@ package org.apache.hudi.client.utils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -34,13 +36,11 @@ import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -61,7 +61,7 @@ public class SparkValidatorUtils {
    * Throw error if there are validation failures.
    */
   public static void runValidators(HoodieWriteConfig config,
-                                   HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
+                                   HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
                                    HoodieEngineContext context,
                                    HoodieTable table,
                                    String instantTime) {
@@ -69,10 +69,10 @@ public class SparkValidatorUtils {
       LOG.info("no validators configured.");
     } else {
       if (!writeMetadata.getWriteStats().isPresent()) {
-        writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collect());
+        writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
       }
-      Set<String> partitionsModified = new HashSet<>(writeMetadata.getWriteStats().get().stream().map(writeStats ->
-          writeStats.getPartitionPath()).collect(Collectors.toList()));
+      Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(writeStats ->
+          writeStats.getPartitionPath()).collect(Collectors.toSet());
       SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context));
       // Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback)
       table.getMetaClient().reloadActiveTimeline();
@@ -101,8 +101,8 @@ public class SparkValidatorUtils {
   /**
    * Run validators in a separate thread pool for parallelism. Each of validator can submit a distributed spark job if needed.
    */
-  private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata,
-                                                       Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
+  private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
+                                                              Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
     return CompletableFuture.supplyAsync(() -> {
       try {
         validator.validate(instantTime, writeMetadata, beforeState, afterState);
@@ -120,10 +120,10 @@ public class SparkValidatorUtils {
    * Note that this only works for COW tables.
    */
   public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext sqlContext,
-                                                      Set<String> partitionsAffected, HoodieTable table) {
+                                                          Set<String> partitionsAffected, HoodieTable table) {
 
     List<String> committedFiles = partitionsAffected.stream()
-        .flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(bf -> bf.getPath()))
+        .flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(BaseFile::getPath))
         .collect(Collectors.toList());
 
     if (committedFiles.isEmpty()) {
@@ -145,7 +145,7 @@ public class SparkValidatorUtils {
    */
   public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext, 
                                                           Set<String> partitionsAffected, 
-                                                          HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
+                                                          HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
                                                           HoodieTable table,
                                                           String instantTime) {
 
@@ -157,7 +157,7 @@ public class SparkValidatorUtils {
         instantTime);
 
     List<String> newFiles = partitionsAffected.stream()
-        .flatMap(partition ->  fsView.getLatestBaseFiles(partition).map(bf -> bf.getPath()))
+        .flatMap(partition ->  fsView.getLatestBaseFiles(partition).map(BaseFile::getPath))
         .collect(Collectors.toList());
 
     if (newFiles.isEmpty()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
index f12d337..f08d11b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.validator;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -28,9 +29,9 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -41,7 +42,7 @@ import java.util.stream.Collectors;
 /**
  * Validator can be configured pre-commit. 
  */
-public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> {
+public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> {
   private static final Logger LOG = LogManager.getLogger(SparkPreCommitValidator.class);
 
   private HoodieSparkTable<T> table;
@@ -59,7 +60,7 @@ public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I,
     if (writeResult.getWriteStats().isPresent()) {
       partitionsModified = writeResult.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
     } else {
-      partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collect());
+      partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collectAsList());
     }
     return partitionsModified;
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
index b27f84e..2506d52 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.validator;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SQLContext;
  * 
  * Expects both queries to return same result.
  */
-public class SqlQueryEqualityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQueryEqualityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
 
   private static final Logger LOG = LogManager.getLogger(SqlQueryEqualityPreCommitValidator.class);
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
index 026334f..8a25150 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.validator;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SQLContext;
  * <p>
  * Expects query results do not match.
  */
-public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
   private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
 
   public SqlQueryInequalityPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
index 122cf2b..3a88d54 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
@@ -20,15 +20,16 @@ package org.apache.hudi.client.validator;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.table.HoodieSparkTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Validator framework to run sql queries and compare table state at different locations.
  */
-public abstract class SqlQueryPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
+public abstract class SqlQueryPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
   private static final Logger LOG = LogManager.getLogger(SqlQueryPreCommitValidator.class);
   private static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0);
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
index 66e956d..b194224 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.validator;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 
@@ -40,7 +40,7 @@ import java.util.List;
  * <p>
  * Example configuration: "query1#expectedResult1;query2#expectedResult2;"
  */
-public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
   private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
 
   public SqlQuerySingleResultPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index ffa1a35..ddcaaec 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -21,6 +21,7 @@ package org.apache.hudi.data;
 
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
@@ -104,6 +105,11 @@ public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
   }
 
   @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {
+    return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply, parallelism));
+  }
+
+  @Override
   public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
     return HoodieJavaRDD.of(pairRDDData.map(
         tuple -> func.apply(new ImmutablePair<>(tuple._1, tuple._2))));
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 1381ea8..66edf60 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -131,6 +131,23 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
   }
 
   @Override
+  public HoodieData<T> distinct(int parallelism) {
+    return HoodieJavaRDD.of(rddData.distinct(parallelism));
+  }
+
+  @Override
+  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
+    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+        .reduceByKey((value1, value2) -> value1, parallelism)
+        .values();
+  }
+
+  @Override
+  public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+    return HoodieJavaRDD.of(rddData.filter(filterFunc::apply));
+  }
+
+  @Override
   public HoodieData<T> union(HoodieData<T> other) {
     return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
   }
@@ -139,4 +156,9 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
   public List<T> collectAsList() {
     return rddData.collect();
   }
+
+  @Override
+  public HoodieData<T> repartition(int parallelism) {
+    return HoodieJavaRDD.of(rddData.repartition(parallelism));
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 31bd436..8f52112 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
@@ -61,6 +62,7 @@ import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
@@ -69,7 +71,6 @@ import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
-import org.apache.hudi.table.action.commit.SparkMergeHelper;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
@@ -82,7 +83,6 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 
 import javax.annotation.Nonnull;
 
@@ -117,58 +117,58 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> insert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> insert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkInsertCommitActionExecutor<>((HoodieSparkEngineContext)context, config, this, instantTime, records).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records,
-      Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
-    return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, config,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records,
+      Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+    return new SparkBulkInsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config,
         this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> delete(HoodieEngineContext context, String instantTime, JavaRDD<HoodieKey> keys) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext context, String instantTime, HoodieData<HoodieKey> keys) {
     return new SparkDeleteCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
   }
 
   @Override
-  public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
-    return new SparkDeletePartitionCommitActionExecutor(context, config, this, instantTime, partitions).execute();
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
+    return new SparkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords) {
     return new SparkUpsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords) {
     return new SparkInsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     return new SparkBulkInsertPreppedCommitActionExecutor((HoodieSparkEngineContext) context, config,
         this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
   }
 
   @Override
-  public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkInsertOverwriteCommitActionExecutor(context, config, this, instantTime, records).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
   }
 
@@ -235,7 +235,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> compact(
       HoodieEngineContext context, String compactionInstantTime) {
     throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
   }
@@ -248,20 +248,20 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(HoodieEngineContext context,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,
                                                            String clusteringInstantTime) {
     return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
   }
 
   @Override
-  public HoodieBootstrapWriteMetadata<JavaRDD<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
+  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
     return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
   }
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
     new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
-    new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+    new CopyOnWriteRestoreActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
   @Override
@@ -292,7 +292,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
     } else {
-      SparkMergeHelper.newInstance().runMerge(this, upsertHandle);
+      HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
     }
 
     // TODO(vc): This needs to be revisited
@@ -336,28 +336,28 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
-    return new CleanActionExecutor(context, config, this, cleanInstantTime, skipLocking).execute();
+    return new CleanActionExecutor<>(context, config, this, cleanInstantTime, skipLocking).execute();
   }
 
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
                                          boolean deleteInstants, boolean skipLocking) {
-    return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant,
+    return new CopyOnWriteRollbackActionExecutor<>(context, config, this, rollbackInstantTime, commitInstant,
         deleteInstants, skipLocking).execute();
   }
 
   @Override
   public HoodieSavepointMetadata savepoint(HoodieEngineContext context, String instantToSavepoint, String user, String comment) {
-    return new SavepointActionExecutor(context, config, this, instantToSavepoint, user, comment).execute();
+    return new SavepointActionExecutor<>(context, config, this, instantToSavepoint, user, comment).execute();
   }
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+    return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
   }
 
   @Override
   public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+    return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 334efa7..efc667a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -31,14 +32,13 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
 import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
 import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
@@ -54,8 +54,6 @@ import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.util.List;
 import java.util.Map;
 
@@ -87,72 +85,72 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> insert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> insert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
     return new SparkInsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records,
-      Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
-    return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records,
+      Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+    return new SparkBulkInsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config,
         this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> delete(HoodieEngineContext context, String instantTime, JavaRDD<HoodieKey> keys) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext context, String instantTime, HoodieData<HoodieKey> keys) {
     return new SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords) {
     return new SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords) {
     return new SparkInsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
-      JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
+      HoodieData<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     return new SparkBulkInsertPreppedDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config,
         this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
   }
 
   @Override
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
-    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
+    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor<>(
         context, config, this, instantTime, extraMetadata,
-        new HoodieSparkMergeOnReadTableCompactor());
+        new HoodieSparkMergeOnReadTableCompactor<>());
     return scheduleCompactionExecutor.execute();
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> compact(
       HoodieEngineContext context, String compactionInstantTime) {
-    RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
-        context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor(),
-        new HoodieSparkCopyOnWriteTable(config, context, getMetaClient()));
-    return convertMetadata(compactionExecutor.execute());
+    RunCompactionActionExecutor<T> compactionExecutor = new RunCompactionActionExecutor<>(
+        context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor<>(),
+        new HoodieSparkCopyOnWriteTable<>(config, context, getMetaClient()));
+    return compactionExecutor.execute();
   }
 
   @Override
-  public HoodieBootstrapWriteMetadata<JavaRDD<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
-    return new SparkBootstrapDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
+  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
+    return new SparkBootstrapDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
   }
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
     new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
-    new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+    new MergeOnReadRestoreActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
   @Override
@@ -169,12 +167,12 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
                                          HoodieInstant commitInstant,
                                          boolean deleteInstants,
                                          boolean skipLocking) {
-    return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
+    return new MergeOnReadRollbackActionExecutor<>(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
   }
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new MergeOnReadRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+    return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index bb8c95d..ce14d43 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -36,20 +36,16 @@ import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
-import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 
-import static org.apache.hudi.data.HoodieJavaRDD.getJavaRDD;
-
 public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
-    extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+    extends HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
 
   private volatile boolean isMetadataTableExists = false;
 
@@ -81,7 +77,7 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
                                                                            HoodieSparkEngineContext context,
                                                                            HoodieTableMetaClient metaClient,
                                                                            boolean refreshTimeline) {
-    HoodieSparkTable hoodieSparkTable;
+    HoodieSparkTable<T> hoodieSparkTable;
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
         hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
@@ -98,11 +94,6 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
     return hoodieSparkTable;
   }
 
-  public static HoodieWriteMetadata<JavaRDD<WriteStatus>> convertMetadata(
-      HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
-    return metadata.clone(getJavaRDD(metadata.getWriteStatuses()));
-  }
-
   @Override
   protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
     return SparkHoodieIndexFactory.createIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index a970e8f..504da8a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -28,10 +28,10 @@ import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
 import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
 import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -68,7 +68,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -80,10 +79,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;
 
 public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
-    extends BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieBootstrapWriteMetadata> {
+    extends BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>> {
 
   private static final Logger LOG = LogManager.getLogger(SparkBootstrapCommitActionExecutor.class);
   protected String bootstrapSchema = null;
@@ -91,7 +91,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
 
   public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
                                             HoodieWriteConfig config,
-                                            HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+                                            HoodieTable table,
                                             Option<Map<String, String>> extraMetadata) {
     super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
         .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
@@ -109,7 +109,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
   }
 
   @Override
-  public HoodieBootstrapWriteMetadata execute() {
+  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
     validate();
     try {
       HoodieTableMetaClient metaClient = table.getMetaClient();
@@ -121,9 +121,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
       Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
 
       // First run metadata bootstrap which will auto commit
-      Option<HoodieWriteMetadata> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
       // if there are full bootstrap to be performed, perform that too
-      Option<HoodieWriteMetadata> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
       // Delete the marker directory for the instant
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -142,7 +142,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
    * Perform Metadata Bootstrap.
    * @param partitionFilesList List of partitions and files within that partitions
    */
-  protected Option<HoodieWriteMetadata> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
+  protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
     if (null == partitionFilesList || partitionFilesList.isEmpty()) {
       return Option.empty();
     }
@@ -155,43 +155,42 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
     table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
         metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
 
-    JavaRDD<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
+    HoodieData<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
 
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
     updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
     return Option.of(result);
   }
 
-  private void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+  private void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
     // RDD actions that are performed after updating the index.
-    writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+    writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
     Instant indexStartTime = Instant.now();
     // Update the index back
-    JavaRDD<WriteStatus> statuses = HoodieJavaRDD.getJavaRDD(
-        table.getIndex().updateLocation(HoodieJavaRDD.of(writeStatusRDD), context, table));
+    HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
     result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
     result.setWriteStatuses(statuses);
     commitOnAutoCommit(result);
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecords) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
     // NO_OP
     return null;
   }
 
   @Override
-  protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+  protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     result.setCommitMetadata(Option.of(new HoodieCommitMetadata()));
   }
 
   @Override
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     // Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index
     // is all done in a single job DAG.
     Map<String, List<Pair<BootstrapFileMapping, HoodieWriteStat>>> bootstrapSourceAndStats =
-        result.getWriteStatuses().collect().stream()
+        result.getWriteStatuses().collectAsList().stream()
             .map(w -> {
               BootstrapWriteStatus ws = (BootstrapWriteStatus) w;
               return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
@@ -214,7 +213,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
     LOG.info("Committing metadata bootstrap !!");
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> stats) {
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> stats) {
     String actionType = table.getMetaClient().getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType);
     // Create a Hoodie table which encapsulated the commits and files visible
@@ -253,7 +252,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
    * Perform Full Bootstrap.
    * @param partitionFilesList List of partitions and files within that partitions
    */
-  protected Option<HoodieWriteMetadata> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
+  protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
     if (null == partitionFilesList || partitionFilesList.isEmpty()) {
       return Option.empty();
     }
@@ -271,10 +270,10 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
     table.getActiveTimeline().createNewInstant(requested);
 
     // Setup correct schema and run bulk insert.
-    return Option.of(getBulkInsertActionExecutor(inputRecordsRDD).execute());
+    return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
   }
 
-  protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
+  protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
     return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
         .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
         inputRecordsRDD, Option.empty(), extraMetadata);
@@ -310,10 +309,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
-  private JavaRDD<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+  private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
     if (null == partitions || partitions.isEmpty()) {
-      return jsc.emptyRDD();
+      return context.emptyHoodieData();
     }
 
     TypedProperties properties = new TypedProperties();
@@ -336,7 +334,8 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
         })
         .collect(Collectors.toList());
 
-    return jsc.parallelize(bootstrapPaths, config.getBootstrapParallelism())
+    context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table.");
+    return context.parallelize(bootstrapPaths, config.getBootstrapParallelism())
         .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
                 partitionFsPair.getRight().getLeft(), keyGenerator));
   }
@@ -352,7 +351,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
   }
 
   @Override
-  protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
+  protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
index 59f8666..d712ca4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
@@ -18,9 +18,8 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
-import java.util.Map;
-
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,7 +28,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
-import org.apache.spark.api.java.JavaRDD;
+
+import java.util.Map;
 
 public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends SparkBootstrapCommitActionExecutor<T> {
@@ -41,7 +41,7 @@ public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPaylo
   }
 
   @Override
-  protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
+  protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
     return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
         .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
         inputRecordsRDD, extraMetadata);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
index 594a910..7d2a4c0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
@@ -28,14 +28,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.spark.api.java.JavaRDD;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
@@ -52,10 +49,8 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = executeClustering(clusteringPlan);
-    JavaRDD<WriteStatus> transformedWriteStatuses = HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses());
-    return writeMetadata.clone(transformedWriteStatuses);
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return executeClustering(clusteringPlan);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ba3b0be..ade5508 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -19,8 +19,8 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -54,13 +55,13 @@ import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -76,10 +77,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 
 public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
+    BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
 
   private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
   protected final Option<BaseKeyGenerator> keyGeneratorOpt;
@@ -97,7 +101,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
                                        HoodieTable table,
                                        String instantTime,
                                        WriteOperationType operationType,
-                                       Option extraMetadata) {
+                                       Option<Map<String, String>> extraMetadata) {
     super(context, config, table, instantTime, operationType, extraMetadata);
     try {
       keyGeneratorOpt = config.populateMetaFields()
@@ -108,14 +112,14 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+  private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
     context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
     Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-        table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
-    UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
+        table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+    UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
         .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
-    Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
-        (Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>) updateStrategy.handleUpdate(inputRecordsRDD);
+    Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
+        updateStrategy.handleUpdate(inputRecords);
     Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
     if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
       return recordsAndPendingClusteringFileGroups.getLeft();
@@ -138,20 +142,20 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
     // Cache the tagged records, so we don't end up computing both
     // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
-    if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
-      inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+    JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
+    if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
+      inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
     } else {
-      LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
+      LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
     }
 
     WorkloadProfile workloadProfile = null;
     if (isWorkloadProfileNeeded()) {
       context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
-      workloadProfile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType, table.getIndex().canIndexLogFiles());
+      workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
       LOG.info("Input workload profile :" + workloadProfile);
     }
 
@@ -162,30 +166,23 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
 
     // handle records update with clustering
-    JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD);
+    HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
 
     context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
-    JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
-    JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
-      } else {
-        return handleInsertPartition(instantTime, partition, recordItr, partitioner);
-      }
-    }, true).flatMap(List::iterator);
-
-    updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+    HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
+    updateIndexAndCommitIfNeeded(writeStatuses, result);
     return result;
   }
 
-  private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+  private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
     HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
     WorkloadStat globalStat = new WorkloadStat();
 
     // group the records by partitionPath + currentLocation combination, count the number of
     // records in each partition
-    Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecordsRDD
-        .mapToPair(record -> new Tuple2<>(
+    Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
+        .mapToPair(record -> Pair.of(
             new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
         .countByKey();
 
@@ -223,9 +220,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
-    JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = dedupedRecords.mapToPair(
-        record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));
+  private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(
+        dedupedRecords.mapToPair(record -> Pair.of(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)));
 
     JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> partitionedRDD;
     if (table.requireSortedRecords()) {
@@ -242,24 +239,28 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
       // Partition only
       partitionedRDD = mappedRDD.partitionBy(partitioner);
     }
-
-    return partitionedRDD.map(Tuple2::_2);
+    return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition, recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, partitioner);
+      }
+    }, true).flatMap(List::iterator));
   }
 
-  protected JavaRDD<WriteStatus> updateIndex(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+  protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
     // RDD actions that are performed after updating the index.
-    writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+    writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
     Instant indexStartTime = Instant.now();
     // Update the index back
-    JavaRDD<WriteStatus> statuses = HoodieJavaRDD.getJavaRDD(
-        table.getIndex().updateLocation(HoodieJavaRDD.of(writeStatusRDD), context, table));
+    HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
     result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
     result.setWriteStatuses(statuses);
     return statuses;
   }
 
-  protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+  protected void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatusRDD, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     updateIndex(writeStatusRDD, result);
     result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
     commitOnAutoCommit(result);
@@ -271,19 +272,19 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
   }
 
   @Override
-  protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
-    result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collect(),
+  protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
+    result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collectAsList(),
         result.getPartitionToReplaceFileIds(),
         extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType())));
   }
 
   @Override
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
-    commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
     String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType);
     result.setCommitted(true);
@@ -304,7 +305,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeStatuses) {
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeStatuses) {
     return Collections.emptyMap();
   }
 
@@ -341,20 +342,20 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
     if (!recordItr.hasNext()) {
       LOG.info("Empty partition with fileId => " + fileId);
-      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+      return Collections.emptyIterator();
     }
     // these are updates
     HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
     return handleUpdateInternal(upsertHandle, fileId);
   }
 
-  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String fileId)
+  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
       throws IOException {
     if (upsertHandle.getOldFilePath() == null) {
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
     } else {
-      SparkMergeHelper.newInstance().runMerge(table, upsertHandle);
+      HoodieMergeHelper.newInstance().runMerge(table, upsertHandle);
     }
 
     // TODO(vc): This needs to be revisited
@@ -383,9 +384,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
     if (!recordItr.hasNext()) {
       LOG.info("Empty partition");
-      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+      return Collections.emptyIterator();
     }
-    return new SparkLazyInsertIterable(recordItr, true, config, instantTime, table, idPfx,
+    return new SparkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
         taskContextSupplier, new CreateHandleFactory<>());
   }
 
@@ -393,7 +394,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
     }
-    return new UpsertPartitioner(profile, context, table, config);
+    return new UpsertPartitioner<>(profile, context, table, config);
   }
 
   public Partitioner getInsertPartitioner(WorkloadProfile profile) {
@@ -407,7 +408,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
   }
 
   @Override
-  protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
+  protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
index f4f1d3a..f4b01c8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -30,24 +31,22 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.util.Map;
 
 public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
-  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
+  private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
 
   public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
-                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                             Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                             String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+                                             Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
   }
 
   public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
-                                        String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                        Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
+                                        String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+                                        Option<BulkInsertPartitioner> bulkInsertPartitioner,
                                         Option<Map<String, String>> extraMetadata) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
     this.inputRecordsRDD = inputRecordsRDD;
@@ -55,7 +54,7 @@ public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     try {
       return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
           this, true, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index d0c5dde..38e3810 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
 import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction;
 import org.apache.hudi.io.CreateHandleFactory;
@@ -46,28 +48,28 @@ import java.util.stream.IntStream;
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, JavaRDD<HoodieRecord<T>>,
-    JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
+public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, HoodieData<HoodieRecord<T>>,
+    HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
 
   private SparkBulkInsertHelper() {
   }
 
   private static class BulkInsertHelperHolder {
-    private static final SparkBulkInsertHelper SPARK_BULK_INSERT_HELPER = new SparkBulkInsertHelper();
+    private static final SparkBulkInsertHelper HOODIE_BULK_INSERT_HELPER = new SparkBulkInsertHelper<>();
   }
 
   public static SparkBulkInsertHelper newInstance() {
-    return BulkInsertHelperHolder.SPARK_BULK_INSERT_HELPER;
+    return BulkInsertHelperHolder.HOODIE_BULK_INSERT_HELPER;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(final JavaRDD<HoodieRecord<T>> inputRecords,
-                                                              final String instantTime,
-                                                              final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                              final HoodieWriteConfig config,
-                                                              final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor,
-                                                              final boolean performDedupe,
-                                                              final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<HoodieRecord<T>> inputRecords,
+                                                                 final String instantTime,
+                                                                 final HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+                                                                 final HoodieWriteConfig config,
+                                                                 final BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor,
+                                                                 final boolean performDedupe,
+                                                                 final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
     //transition bulk_insert state to inflight
@@ -75,7 +77,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
             executor.getCommitActionType(), instantTime), Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
     // write new files
-    JavaRDD<WriteStatus> writeStatuses =
+    HoodieData<WriteStatus> writeStatuses =
         bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
     //update index
     ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
@@ -83,39 +85,40 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
   }
 
   @Override
-  public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords,
-                                         String instantTime,
-                                         HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                         HoodieWriteConfig config,
-                                         boolean performDedupe,
-                                         Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
-                                         boolean useWriterSchema,
-                                         int parallelism,
-                                         WriteHandleFactory writeHandleFactory) {
+  public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecords,
+                                            String instantTime,
+                                            HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+                                            HoodieWriteConfig config,
+                                            boolean performDedupe,
+                                            Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
+                                            boolean useWriterSchema,
+                                            int parallelism,
+                                            WriteHandleFactory writeHandleFactory) {
 
     // De-dupe/merge if needed
-    JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
+    HoodieData<HoodieRecord<T>> dedupedRecords = inputRecords;
 
     if (performDedupe) {
-      dedupedRecords = (JavaRDD<HoodieRecord<T>>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
+      dedupedRecords = (HoodieData<HoodieRecord<T>>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
           parallelism, table);
     }
 
-    final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+    final HoodieData<HoodieRecord<T>> repartitionedRecords;
     BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
         ? userDefinedBulkInsertPartitioner.get()
         : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
-    repartitionedRecords = (JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
+    // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
+    repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));
 
     // generate new file ID prefixes for each output partition
     final List<String> fileIDPrefixes =
         IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
 
-    JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
-        .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime,
+    JavaRDD<WriteStatus> writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords)
+        .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
             partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
         .flatMap(List::iterator);
 
-    return writeStatusRDD;
+    return HoodieJavaRDD.of(writeStatusRDD);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
index 28d8cb0..8862981 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -30,25 +31,23 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 public class SparkBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
-  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
+  private final HoodieData<HoodieRecord<T>> preppedInputRecordRdd;
+  private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
 
   public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
                                                     HoodieWriteConfig config, HoodieTable table,
-                                                    String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-                                                    Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+                                                    String instantTime, HoodieData<HoodieRecord<T>> preppedInputRecordRdd,
+                                                    Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecordRdd = preppedInputRecordRdd;
     this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     try {
       return SparkBulkInsertHelper.newInstance().bulkInsert(preppedInputRecordRdd, instantTime, table, config,
           this, false, userDefinedBulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
index 997c7bf..a6fc996 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
 
 public class SparkDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieKey> keys;
+  private final HoodieData<HoodieKey> keys;
 
   public SparkDeleteCommitActionExecutor(HoodieSparkEngineContext context,
                                          HoodieWriteConfig config, HoodieTable table,
-                                         String instantTime, JavaRDD<HoodieKey> keys) {
+                                         String instantTime, HoodieData<HoodieKey> keys) {
     super(context, config, table, instantTime, WriteOperationType.DELETE);
     this.keys = keys;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index 90bcdc9..b31eb7b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -19,20 +19,18 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -51,16 +49,15 @@ public class SparkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayl
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct()
-        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
-    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
+    Map<String, List<String>> partitionToReplaceFileIds = HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct()
+        .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
     result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
     result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
-
-    result.setWriteStatuses(jsc.emptyRDD());
+    result.setWriteStatuses(context.emptyHoodieData());
     this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
     this.commitOnAutoCommit(result);
     return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
index ba91fe1..479b513 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
 
 public class SparkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
 
   public SparkInsertCommitActionExecutor(HoodieSparkEngineContext context,
                                          HoodieWriteConfig config, HoodieTable table,
-                                         String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                         String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, WriteOperationType.INSERT);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
         config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 7a3549c..518063e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -19,18 +19,21 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaRDD;
-import scala.Tuple2;
 
 import java.util.List;
 import java.util.Map;
@@ -39,25 +42,25 @@ import java.util.stream.Collectors;
 public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
 
   public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, HoodieTable table,
-                                                  String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                                  String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
   }
 
   public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, HoodieTable table,
-                                                  String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+                                                  String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
                                                   WriteOperationType writeOperationType) {
     super(context, config, table, instantTime, writeOperationType);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
         config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType);
   }
 
@@ -74,13 +77,13 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
   }
 
   @Override
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    return writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
-        new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
+    return HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+        Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
   }
 
   protected List<String> getAllExistingFileIds(String partitionPath) {
     // because new commit is not complete. it is safe to mark all existing file Ids as old files
-    return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+    return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index f7c98d5..93d0a81 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -19,20 +19,19 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
 
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -41,21 +40,18 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
 
   public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
                                                        HoodieWriteConfig config, HoodieTable table,
-                                                       String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                                       String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
   }
 
   @Override
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
-    Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+  protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    if (partitionPaths != null && partitionPaths.size() > 0) {
-      context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
-      JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
-      partitionToExistingFileIds = partitionPathRdd.mapToPair(
-          partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+    if (partitionPaths == null || partitionPaths.isEmpty()) {
+      return Collections.emptyMap();
     }
-    return partitionToExistingFileIds;
+    context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
+    return HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, partitionPaths.size()).mapToPair(
+        partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
index 400147b..ff1a7e2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
 
 public class SparkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedRecords;
+  private final HoodieData<HoodieRecord<T>> preppedRecords;
 
   public SparkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
                                                 HoodieWriteConfig config, HoodieTable table,
-                                                String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+                                                String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
     super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return super.execute(preppedRecords);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
index c914384..ccee9cf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
 
 public class SparkUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
 
   public SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context,
                                          HoodieWriteConfig config, HoodieTable table,
-                                         String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                         String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, WriteOperationType.UPSERT);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
         config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
index e36073f..73d4085 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
 
 public class SparkUpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedRecords;
+  private final HoodieData<HoodieRecord<T>> preppedRecords;
 
   public SparkUpsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
                                                 HoodieWriteConfig config, HoodieTable table,
-                                                String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+                                                String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
     super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return super.execute(preppedRecords);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 6ca4408..61cb1ff 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -27,10 +26,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.spark.api.java.JavaRDD;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 
 /**
  * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
@@ -39,7 +37,7 @@ import org.apache.spark.api.java.JavaRDD;
  */
 @SuppressWarnings("checkstyle:LineLength")
 public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
-    extends HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+    extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
 
   @Override
   public void preCompact(
@@ -53,6 +51,6 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
 
   @Override
   public void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config) {
-    HoodieJavaRDD.getJavaRDD(writeStatus).persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+    writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
index 222506e..61e6f25 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.deltacommit;
 
-import java.util.Map;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -42,13 +41,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
   private static final Logger LOG = LogManager.getLogger(BaseSparkDeltaCommitActionExecutor.class);
 
   // UpsertPartitioner for MergeOnRead table type
-  private SparkUpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
+  private SparkUpsertDeltaCommitPartitioner<T> mergeOnReadUpsertPartitioner;
 
   public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
                                                 String instantTime, WriteOperationType operationType) {
@@ -66,7 +66,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordP
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
     }
-    mergeOnReadUpsertPartitioner = new SparkUpsertDeltaCommitPartitioner(profile, (HoodieSparkEngineContext) context, table, config);
+    mergeOnReadUpsertPartitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext) context, table, config);
     return mergeOnReadUpsertPartitioner;
   }
 
@@ -79,7 +79,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordP
       LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
       return super.handleUpdate(partitionPath, fileId, recordItr);
     } else {
-      HoodieAppendHandle<?,?,?,?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
+      HoodieAppendHandle<?, ?, ?, ?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
           partitionPath, fileId, recordItr, taskContextSupplier);
       appendHandle.doAppend();
       return Collections.singletonList(appendHandle.close()).iterator();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
index 6f23e41..190a714 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -31,25 +32,23 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.util.Map;
 
 public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
-  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
+  private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
 
   public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
-                                                  String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                                  Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner)  {
+                                                  String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+                                                  Option<BulkInsertPartitioner> bulkInsertPartitioner)  {
     this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
   }
 
   public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
-                                                  String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                                  Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
+                                                  String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+                                                  Option<BulkInsertPartitioner> bulkInsertPartitioner,
                                                   Option<Map<String, String>> extraMetadata) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
     this.inputRecordsRDD = inputRecordsRDD;
@@ -57,7 +56,7 @@ public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayl
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     try {
       return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
           this, true, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
index be5b903..c01bce2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -31,25 +32,23 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
-import org.apache.spark.api.java.JavaRDD;
-
 public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
-  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+  private final HoodieData<HoodieRecord<T>> preppedInputRecordRdd;
+  private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
 
   public SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                                          HoodieWriteConfig config, HoodieTable table,
-                                                         String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-                                                         Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
+                                                         String instantTime, HoodieData<HoodieRecord<T>> preppedInputRecordRdd,
+                                                         Option<BulkInsertPartitioner> bulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecordRdd = preppedInputRecordRdd;
     this.bulkInsertPartitioner = bulkInsertPartitioner;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     try {
       return SparkBulkInsertHelper.newInstance().bulkInsert(preppedInputRecordRdd, instantTime, table, config,
           this, false, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
index 7cff563..9a5b08d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkDeleteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieDeleteHelper;
 
 public class SparkDeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieKey> keys;
+  private final HoodieData<HoodieKey> keys;
 
   public SparkDeleteDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                               HoodieWriteConfig config, HoodieTable table,
-                                              String instantTime, JavaRDD<HoodieKey> keys) {
+                                              String instantTime, HoodieData<HoodieKey> keys) {
     super(context, config, table, instantTime, WriteOperationType.DELETE);
     this.keys = keys;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
index 7e38823..4889460 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
@@ -20,31 +20,30 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
 
 public class SparkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
 
   public SparkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                               HoodieWriteConfig config, HoodieTable table,
-                                              String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                              String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, WriteOperationType.INSERT);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
         config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, operationType);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
index e401d95..dbf0cbc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -27,22 +28,20 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 public class SparkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedRecords;
+  private final HoodieData<HoodieRecord<T>> preppedRecords;
 
   public SparkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                                      HoodieWriteConfig config, HoodieTable table,
-                                                     String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+                                                     String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
     super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return super.execute(preppedRecords);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
index c63be62..67ecb9a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
@@ -18,32 +18,32 @@
 
 package org.apache.hudi.table.action.deltacommit;
 
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
 
 public class SparkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+  private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
 
   public SparkUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                               HoodieWriteConfig config, HoodieTable table,
-                                              String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+                                              String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
     super(context, config, table, instantTime, WriteOperationType.UPSERT);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
   @Override
-  public HoodieWriteMetadata execute() {
-    return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
         config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, operationType);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
index f593fea..9540030 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -27,22 +28,20 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 public class SparkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
-  private final JavaRDD<HoodieRecord<T>> preppedRecords;
+  private final HoodieData<HoodieRecord<T>> preppedRecords;
 
   public SparkUpsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
                                                      HoodieWriteConfig config, HoodieTable table,
-                                                     String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+                                                     String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
     super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return super.execute(preppedRecords);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 6e67bd6..ce0cc37 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -32,6 +32,7 @@ import org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
@@ -76,6 +77,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -83,17 +85,19 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -451,13 +455,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     HoodieRecord recordThree =
         new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
 
-    JavaRDD<HoodieRecord<RawTripTestPayload>> records =
-        jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+    HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
+        jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
 
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
+    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
     assertEquals(1, dedupedRecs.size());
     assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
     assertNodupesWithinPartition(dedupedRecs);
@@ -465,7 +469,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
+    dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
@@ -779,6 +783,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         firstInsertRecords + secondInsertRecords, 2, false, config.populateMetaFields());
   }
 
+  @Test
+  public void testBulkInsertWithCustomPartitioner() {
+    HoodieWriteConfig config = getConfigBuilder().withRollbackUsingMarkers(true).build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+      final String commitTime1 = "001";
+      client.startCommitWithTime(commitTime1);
+      List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
+      JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
+      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new RDDCustomColumnsSortPartitioner(new String[]{"rider"}, HoodieTestDataGenerator.AVRO_SCHEMA, false);
+      List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1, commitTime1, Option.of(partitioner)).collect();
+      assertNoWriteErrors(statuses);
+    }
+  }
+
   /**
    * Tests deletion of records.
    */
@@ -2594,7 +2612,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
   }
 
-  public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
+  public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
 
     public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
       super(table, context, config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index dcc41ad..b9f0252 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.testutils.Transformations;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -556,7 +557,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
       // initialize partitioner
       hoodieTable.getHoodieView().sync();
       BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
-          newDeleteTime, deleteRDD);
+          newDeleteTime, HoodieJavaRDD.of(deleteRDD));
       actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
       final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {
         return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 53cd6e5..0b29cf2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -38,6 +39,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLayoutConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.index.HoodieIndex;
@@ -325,7 +327,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
 
     // Insert new records
     BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
-        firstCommitTime, jsc.parallelize(records));
+        firstCommitTime, context.parallelize(records));
     List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
     }).flatMap(Transformations::flattenAsIterator).collect();
@@ -368,7 +370,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     // Insert new records
     final List<HoodieRecord> recs2 = records;
     BaseSparkCommitActionExecutor actionExecutor = new SparkInsertPreppedCommitActionExecutor(context, config, table,
-        instantTime, jsc.parallelize(recs2));
+        instantTime, context.parallelize(recs2));
     List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator());
     }).flatMap(Transformations::flattenAsIterator).collect();
@@ -389,7 +391,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     // Insert new records
     final List<HoodieRecord> recs3 = records;
     BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertPreppedCommitActionExecutor(context, config, table,
-        instantTime, jsc.parallelize(recs3));
+        instantTime, context.parallelize(recs3));
     returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator());
     }).flatMap(Transformations::flattenAsIterator).collect();
@@ -422,7 +424,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
 
     // Insert new records
     BaseSparkCommitActionExecutor actionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
-        instantTime, jsc.parallelize(records));
+        instantTime, context.parallelize(records));
     jsc.parallelize(Arrays.asList(1))
         .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
         .map(Transformations::flatten).collect();
@@ -452,7 +454,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
     final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
     BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
-        instantTime, jsc.parallelize(inserts));
+        instantTime, context.parallelize(inserts));
     final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
     }).map(Transformations::flatten).collect();
@@ -466,7 +468,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
     table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, HoodieTableMetaClient.reload(metaClient));
     BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
-        instantTime, jsc.parallelize(updates));
+        instantTime, context.parallelize(updates));
     final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
       return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
     }).map(Transformations::flatten).collect();
@@ -486,8 +488,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     // Insert new records
     final JavaRDD<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert(jsc);
     SparkBulkInsertCommitActionExecutor bulkInsertExecutor = new SparkBulkInsertCommitActionExecutor(
-        context, config, table, instantTime, inputRecords, Option.empty());
-    List<WriteStatus> returnedStatuses = ((JavaRDD<WriteStatus>)bulkInsertExecutor.execute().getWriteStatuses()).collect();
+        context, config, table, instantTime, HoodieJavaRDD.of(inputRecords), Option.empty());
+    List<WriteStatus> returnedStatuses = ((HoodieData<WriteStatus>) bulkInsertExecutor.execute().getWriteStatuses()).collectAsList();
     verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java
deleted file mode 100644
index 2d852f8..0000000
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-
-import org.apache.spark.Partition;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class TestDeleteHelper {
-
-  private enum CombineTestMode {
-    None, GlobalIndex, NoneGlobalIndex;
-  }
-
-  private static final String BASE_PATH = "/tmp/";
-  private static final boolean WITH_COMBINE = true;
-  private static final boolean WITHOUT_COMBINE = false;
-  private static final int DELETE_PARALLELISM = 200;
-
-  @Mock
-  private HoodieIndex index;
-  @Mock
-  private HoodieTable<EmptyHoodieRecordPayload, JavaRDD<HoodieRecord>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table;
-  @Mock
-  private BaseSparkCommitActionExecutor<EmptyHoodieRecordPayload> executor;
-  @Mock
-  private HoodieWriteMetadata metadata;
-  @Mock
-  private JavaPairRDD keyPairs;
-  @Mock
-  private JavaSparkContext jsc;
-  @Mock
-  private HoodieSparkEngineContext context;
-
-  private JavaRDD<HoodieKey> rddToDelete;
-  private HoodieWriteConfig config;
-
-  @BeforeEach
-  public void setUp() {
-    when(table.getIndex()).thenReturn(index);
-    when(context.getJavaSparkContext()).thenReturn(jsc);
-  }
-
-  @Test
-  public void deleteWithEmptyRDDShouldNotExecute() {
-    rddToDelete = mockEmptyHoodieKeyRdd();
-    config = newWriteConfig(WITHOUT_COMBINE);
-
-    SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
-    verify(rddToDelete, never()).repartition(DELETE_PARALLELISM);
-    verifyNoDeleteExecution();
-  }
-
-  @Test
-  public void deleteWithoutCombineShouldRepartitionForNonEmptyRdd() {
-    rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.None);
-    config = newWriteConfig(WITHOUT_COMBINE);
-
-    SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
-    verify(rddToDelete, times(1)).repartition(DELETE_PARALLELISM);
-    verifyDeleteExecution();
-  }
-
-  @Test
-  public void deleteWithCombineShouldRepartitionForNonEmptyRddAndNonGlobalIndex() {
-    rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.NoneGlobalIndex);
-    config = newWriteConfig(WITH_COMBINE);
-
-    SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
-    verify(rddToDelete, times(1)).distinct(DELETE_PARALLELISM);
-    verifyDeleteExecution();
-  }
-
-  @Test
-  public void deleteWithCombineShouldRepartitionForNonEmptyRddAndGlobalIndex() {
-    rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.GlobalIndex);
-    config = newWriteConfig(WITH_COMBINE);
-    when(index.isGlobal()).thenReturn(true);
-
-    SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
-    verify(keyPairs, times(1)).reduceByKey(any(), eq(DELETE_PARALLELISM));
-    verifyDeleteExecution();
-  }
-
-  private void verifyDeleteExecution() {
-    verify(executor, times(1)).execute(any());
-    verify(metadata, times(1)).setIndexLookupDuration(any());
-  }
-
-  private void verifyNoDeleteExecution() {
-    verify(executor, never()).execute(any());
-  }
-
-  private HoodieWriteConfig newWriteConfig(boolean combine) {
-    return HoodieWriteConfig.newBuilder()
-            .combineDeleteInput(combine)
-            .withPath(BASE_PATH)
-            .withDeleteParallelism(DELETE_PARALLELISM)
-            .build();
-  }
-
-  private JavaRDD<HoodieKey> newHoodieKeysRddMock(int howMany, CombineTestMode combineMode) {
-    JavaRDD<HoodieKey> keysToDelete = mock(JavaRDD.class);
-
-    JavaRDD recordsRdd = mock(JavaRDD.class);
-    when(recordsRdd.filter(any())).thenReturn(recordsRdd);
-    when(recordsRdd.isEmpty()).thenReturn(howMany <= 0);
-    when(index.tagLocation(any(), any(), any())).thenReturn(HoodieJavaRDD.of(recordsRdd));
-
-    if (combineMode == CombineTestMode.GlobalIndex) {
-      when(keyPairs.reduceByKey(any(), anyInt())).thenReturn(keyPairs);
-      when(keyPairs.values()).thenReturn(keysToDelete);
-      when(keysToDelete.keyBy(any())).thenReturn(keyPairs);
-    } else if (combineMode == CombineTestMode.NoneGlobalIndex) {
-      when(keysToDelete.distinct(anyInt())).thenReturn(keysToDelete);
-    } else if (combineMode == CombineTestMode.None) {
-      List<Partition> parts = mock(List.class);
-      when(parts.isEmpty()).thenReturn(howMany <= 0);
-      when(keysToDelete.repartition(anyInt())).thenReturn(keysToDelete);
-      when(keysToDelete.partitions()).thenReturn(parts);
-    }
-
-    when(keysToDelete.map(any())).thenReturn(recordsRdd);
-    when(executor.execute(any())).thenReturn(metadata);
-    return keysToDelete;
-  }
-
-  private JavaRDD<HoodieKey> mockEmptyHoodieKeyRdd() {
-    JavaRDD<HoodieKey> emptyRdd = mock(JavaRDD.class);
-    doReturn(true).when(emptyRdd).isEmpty();
-    doReturn(Collections.emptyList()).when(emptyRdd).partitions();
-    doReturn(emptyRdd).when(emptyRdd).map(any());
-
-    doReturn(HoodieJavaRDD.of(emptyRdd)).when(index).tagLocation(any(), any(), any());
-    doReturn(emptyRdd).when(emptyRdd).filter(any());
-
-    doNothing().when(executor).saveWorkloadProfileMetadataToInflight(any(), anyString());
-    doReturn(emptyRdd).when(jsc).emptyRDD();
-    return emptyRdd;
-  }
-
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 9afe5f3..59174a9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -195,12 +196,12 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       String compactionInstantTime = "102";
       table.scheduleCompaction(context, compactionInstantTime, Option.empty());
       table.getMetaClient().reloadActiveTimeline();
-      JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) table.compact(
+      HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
           context, compactionInstantTime).getWriteStatuses();
 
       // Verify that all partition paths are present in the WriteStatus result
       for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<WriteStatus> writeStatuses = result.collect();
+        List<WriteStatus> writeStatuses = result.collectAsList();
         assertTrue(writeStatuses.stream()
             .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 093fd43..4e8d2b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -97,6 +97,12 @@ public abstract class HoodieData<T> implements Serializable {
    */
   public abstract HoodieData<T> distinct();
 
+  public abstract HoodieData<T> distinct(int parallelism);
+
+  public abstract <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism);
+
+  public abstract HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc);
+
   /**
    * Unions this {@link HoodieData} with other {@link HoodieData}.
    * @param other {@link HoodieData} of interest.
@@ -108,4 +114,6 @@ public abstract class HoodieData<T> implements Serializable {
    * @return collected results in {@link List<T>}.
    */
   public abstract List<T> collectAsList();
+
+  public abstract HoodieData<T> repartition(int parallelism);
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
index 9441619..c23e712 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
@@ -133,6 +133,26 @@ public class HoodieList<T> extends HoodieData<T> {
   }
 
   @Override
+  public HoodieData<T> distinct(int parallelism) {
+    return distinct();
+  }
+
+  @Override
+  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
+    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+        .reduceByKey((value1, value2) -> value1, parallelism)
+        .values();
+  }
+
+  @Override
+  public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+    return HoodieList.of(listData
+        .stream()
+        .filter(i -> throwingMapWrapper(filterFunc).apply(i))
+        .collect(Collectors.toList()));
+  }
+
+  @Override
   public HoodieData<T> union(HoodieData<T> other) {
     List<T> unionResult = new ArrayList<>();
     unionResult.addAll(listData);
@@ -144,4 +164,10 @@ public class HoodieList<T> extends HoodieData<T> {
   public List<T> collectAsList() {
     return listData;
   }
+
+  @Override
+  public HoodieData<T> repartition(int parallelism) {
+    // no op
+    return this;
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
index c941231..1e125c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.common.data;
 
 import org.apache.hudi.common.function.FunctionWrapper;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -111,6 +113,15 @@ public class HoodieMapPair<K, V> extends HoodiePairData<K, V> {
   }
 
   @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {
+    return HoodieMapPair.of(mapPairData.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> {
+          Option<V> reducedValue = Option.fromJavaOptional(e.getValue().stream().reduce(func::apply));
+          return reducedValue.isPresent() ? Collections.singletonList(reducedValue.get()) : Collections.emptyList();
+        })));
+  }
+
+  @Override
   public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
     Function<Pair<K, V>, O> throwableFunc = throwingMapWrapper(func);
     return HoodieList.of(
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index b9bdcb3..9ff5279 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.data;
 
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
@@ -72,6 +73,8 @@ public abstract class HoodiePairData<K, V> implements Serializable {
    */
   public abstract Map<K, Long> countByKey();
 
+  public abstract HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism);
+
   /**
    * @param func serializable map function.
    * @param <O>  output object type.
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
new file mode 100644
index 0000000..6130d4a
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieList {
+
+  private static Stream<Arguments> distinctWithKey() {
+    return Stream.of(
+        Arguments.of(
+            Arrays.asList(Pair.of("k1", 1), Pair.of("k2", 2)),
+            Arrays.asList(Pair.of("k1", 1), Pair.of("k1", 10), Pair.of("k1", 100), Pair.of("k2", 2)))
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  void distinctWithKey(List<Pair<String, Integer>> expected, List<Pair<String, Integer>> originalList) {
+    List<Pair<String, Integer>> distinctList = HoodieList.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
+    assertEquals(expected, distinctList);
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
index 86b1a21..20e9a8f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
@@ -25,6 +25,9 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,7 +36,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestHoodieMapPair {
@@ -117,6 +123,29 @@ public class TestHoodieMapPair {
             })));
   }
 
+  private static Stream<Arguments> testReduceByKey() {
+    return Stream.of(
+        Arguments.of(
+            createImmutableMap(
+                Pair.of(1, createImmutableList(1001)),
+                Pair.of(2, createImmutableList(2001)),
+                Pair.of(3, createImmutableList(3001)),
+                Pair.of(4, createImmutableList())),
+            createImmutableMap(
+                Pair.of(1, createImmutableList(1001, 1002, 1003)),
+                Pair.of(2, createImmutableList(2001, 2002)),
+                Pair.of(3, createImmutableList(3001)),
+                Pair.of(4, createImmutableList())))
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  public void testReduceByKey(Map<Integer, List<Integer>> expected, Map<Integer, List<Integer>> original) {
+    HoodiePairData<Integer, Integer> reduced = HoodieMapPair.of(original).reduceByKey((a, b) -> a, 1);
+    assertEquals(expected, HoodieMapPair.getMapPair(reduced));
+  }
+
   @Test
   public void testLeftOuterJoinSingleValuePerKey() {
     HoodiePairData<String, String> pairData1 = constructTestMapPairData(Arrays.asList(