You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/22 23:47:01 UTC

[hudi] branch master updated: [HUDI-5070] Move flaky cleaner tests to separate class (#7034)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b78c3441c4 [HUDI-5070] Move flaky cleaner tests to separate class (#7034)
b78c3441c4 is described below

commit b78c3441c4e28200abec340eaff852375764cbdb
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Sun Oct 23 07:46:53 2022 +0800

    [HUDI-5070] Move flaky cleaner tests to separate class (#7034)
---
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   2 +-
 .../java/org/apache/hudi/table/TestCleaner.java    | 186 +++-----------------
 .../clean/TestCleanerInsertAndCleanByCommits.java  | 194 +++++++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |  85 ++++++---
 .../hudi/testutils/HoodieClientTestHarness.java    |  27 ++-
 5 files changed, 300 insertions(+), 194 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 9787387a1e..5f4cf358d5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1327,7 +1327,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
     List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
     assertNoWriteErrors(statuses);
-    assertPartitionMetadata(new String[] {testPartitionPath}, fs);
+    assertPartitionMetadata(basePath, new String[] {testPartitionPath}, fs);
     assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
     String file1 = statuses.get(0).getFileId();
     assertEquals(100,
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 4f69518ceb..c1dae9afa4 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -31,7 +31,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -73,7 +73,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
@@ -86,8 +85,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -101,10 +98,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -132,21 +127,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestCleaner extends HoodieClientTestBase {
 
   private static final int BIG_BATCH_INSERT_SIZE = 500;
-  private static final Logger LOG = LogManager.getLogger(TestCleaner.class);
+  private static final int PARALLELISM = 10;
 
   /**
    * Helper method to do first batch of insert for clean by versions/commits tests.
    *
-   * @param cfg Hoodie Write Config
+   * @param context Spark engine context
+   * @param metaClient Hoodie table meta client
    * @param client Hoodie Client
    * @param recordGenFunction Function to generate records for insertion
    * @param insertFn Insertion API for testing
    * @throws Exception in case of error
    */
-  private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
+  public static Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest(
+      HoodieSparkEngineContext context,
+      HoodieTableMetaClient metaClient,
+      SparkRDDWriteClient client,
       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
-      HoodieCleaningPolicy cleaningPolicy) throws Exception {
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
 
     /*
      * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -155,7 +153,7 @@ public class TestCleaner extends HoodieClientTestBase {
     String newCommitTime = client.startCommit();
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
+    JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, PARALLELISM);
 
     JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
     // Verify there are no errors
@@ -174,8 +172,8 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(table.getCompletedCleanTimeline().empty());
 
     if (client.getConfig().shouldAutoCommit()) {
-      HoodieIndex index = SparkHoodieIndexFactory.createIndex(cfg);
-      List<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table).collect();
+      HoodieIndex index = SparkHoodieIndexFactory.createIndex(client.getConfig());
+      List<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, PARALLELISM), table).collect();
       checkTaggedRecords(taggedRecords, newCommitTime);
     }
     return Pair.of(newCommitTime, statuses);
@@ -184,16 +182,17 @@ public class TestCleaner extends HoodieClientTestBase {
   /**
    * Helper method to do first batch of insert for clean by versions/commits tests.
    *
-   * @param cfg Hoodie Write Config
+   * @param context Spark engine context
    * @param client Hoodie Client
    * @param recordGenFunction Function to generate records for insertion
    * @param insertFn Insertion API for testing
    * @throws Exception in case of error
    */
-  private Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
-                                                       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-                                                       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
-                                                       HoodieCleaningPolicy cleaningPolicy) throws Exception {
+  public static Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest(
+      HoodieSparkEngineContext context,
+      SparkRDDWriteClient client,
+      Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
 
     /*
      * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -202,7 +201,7 @@ public class TestCleaner extends HoodieClientTestBase {
     String newCommitTime = client.startCommit();
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
+    JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, 5);
 
     JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
     // Verify there are no errors
@@ -359,8 +358,7 @@ public class TestCleaner extends HoodieClientTestBase {
       final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
 
-      insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+      insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -458,15 +456,6 @@ public class TestCleaner extends HoodieClientTestBase {
     }
   }
 
-  /**
-   * Test Clean-By-Commits using insert/upsert API.
-   */
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
-    testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
-  }
-
   /**
    * Test Clean-By-Commits using insert/upsert API.
    */
@@ -475,117 +464,6 @@ public class TestCleaner extends HoodieClientTestBase {
     testFailedInsertAndCleanByCommits(SparkRDDWriteClient::insert, false);
   }
 
-  /**
-   * Test Clean-By-Commits using prepped version of insert/upsert API.
-   */
-  @Test
-  public void testInsertPreppedAndCleanByCommits() throws Exception {
-    testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
-        true, false);
-  }
-
-  /**
-   * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
-   */
-  @Test
-  public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
-    testInsertAndCleanByCommits(
-        (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
-        SparkRDDWriteClient::upsertPreppedRecords, true, false);
-  }
-
-  /**
-   * Test Clean-By-Commits using bulk-insert/upsert API.
-   */
-  @Test
-  public void testBulkInsertAndCleanByCommits() throws Exception {
-    testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
-  }
-
-  /**
-   * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
-   *
-   * @param insertFn Insert API to be tested
-   * @param upsertFn Upsert API to be tested
-   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
-   *        record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
-   * @throws Exception in case of errors
-   */
-  private void testInsertAndCleanByCommits(
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
-      throws Exception {
-    int maxCommits = 3; // keep upto 3 commits from the past
-    HoodieWriteConfig cfg = getConfigBuilder()
-        .withCleanConfig(HoodieCleanConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build())
-        .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
-        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
-        .build();
-    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
-    final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
-        generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
-    final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
-        generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
-
-    insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
-
-    // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
-    for (int i = 0; i < 8; i++) {
-      String newCommitTime = makeNewCommitTime();
-      try {
-        client.startCommitWithTime(newCommitTime);
-        List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
-
-        List<WriteStatus> statuses = upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect();
-        // Verify there are no errors
-        assertNoWriteErrors(statuses);
-
-        metaClient = HoodieTableMetaClient.reload(metaClient);
-        HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient);
-        HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
-        HoodieInstant lastInstant = activeTimeline.lastInstant().get();
-        if (cfg.isAsyncClean()) {
-          activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
-        }
-        // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
-        // commit
-        Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
-        Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
-        if (earliestRetainedCommit.isPresent()) {
-          acceptableCommits
-              .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
-                  .getInstants().collect(Collectors.toSet()));
-          acceptableCommits.add(earliestRetainedCommit.get());
-        }
-
-        TableFileSystemView fsView = table1.getFileSystemView();
-        // Need to ensure the following
-        for (String partitionPath : dataGen.getPartitionPaths()) {
-          List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-          for (HoodieFileGroup fileGroup : fileGroups) {
-            Set<String> commitTimes = new HashSet<>();
-            fileGroup.getAllBaseFiles().forEach(value -> {
-              LOG.debug("Data File - " + value);
-              commitTimes.add(value.getCommitTime());
-            });
-            if (cfg.isAsyncClean()) {
-              commitTimes.remove(lastInstant.getTimestamp());
-            }
-            assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
-                "Only contain acceptable versions of file should be present");
-          }
-        }
-      } catch (IOException ioe) {
-        throw new RuntimeException(ioe);
-      }
-    }
-  }
-
   /**
    * Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective.
    *
@@ -612,22 +490,18 @@ public class TestCleaner extends HoodieClientTestBase {
     final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
         generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
 
-    Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+    Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
     client.commit(result.getLeft(), result.getRight());
 
     HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);
     assertTrue(table.getCompletedCleanTimeline().empty());
 
-    insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+    insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
 
-    insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+    insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
 
     Pair<String, JavaRDD<WriteStatus>> ret =
-        insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+        insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
     // Await till enough time passes such that the last failed commits heartbeats are expired
     await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
         .isHeartbeatExpired(ret.getLeft()));
@@ -1352,8 +1226,7 @@ public class TestCleaner extends HoodieClientTestBase {
       final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
 
-      Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+      Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
 
       client.commit(result.getLeft(), result.getRight());
 
@@ -1361,15 +1234,12 @@ public class TestCleaner extends HoodieClientTestBase {
 
       assertTrue(table.getCompletedCleanTimeline().empty());
 
-      insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+      insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
 
-      insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+      insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
 
       Pair<String, JavaRDD<WriteStatus>> ret =
-          insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
-          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+          insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
 
       // Await till enough time passes such that the last failed commits heartbeats are expired
       await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
new file mode 100644
index 0000000000..7f5cd5cd99
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
+import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieClientTestBase.Function2;
+import static org.apache.hudi.testutils.HoodieClientTestBase.Function3;
+import static org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTestHarness {
+
+  private static final Logger LOG = LogManager.getLogger(TestCleanerInsertAndCleanByCommits.class);
+  private static final int BATCH_SIZE = 100;
+  private static final int PARALLELISM = 2;
+
+  /**
+   * Test Clean-By-Commits using insert/upsert API.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
+    testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
+  }
+
+  /**
+   * Test Clean-By-Commits using prepped version of insert/upsert API.
+   */
+  @Test
+  public void testInsertPreppedAndCleanByCommits() throws Exception {
+    testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
+        true, false);
+  }
+
+  /**
+   * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
+   */
+  @Test
+  public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
+    testInsertAndCleanByCommits(
+        (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
+        SparkRDDWriteClient::upsertPreppedRecords, true, false);
+  }
+
+  /**
+   * Test Clean-By-Commits using bulk-insert/upsert API.
+   */
+  @Test
+  public void testBulkInsertAndCleanByCommits() throws Exception {
+    testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
+  }
+
+  /**
+   * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
+   *
+   * @param insertFn Insert API to be tested
+   * @param upsertFn Upsert API to be tested
+   * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
+   *        record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
+   * @throws Exception in case of errors
+   */
+  private void testInsertAndCleanByCommits(
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
+      throws Exception {
+    int maxCommits = 3; // keep upto 3 commits from the past
+    HoodieWriteConfig cfg = getConfigBuilder(true)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .withAsyncClean(isAsync).retainCommits(maxCommits).build())
+        .withParallelism(PARALLELISM, PARALLELISM)
+        .withBulkInsertParallelism(PARALLELISM)
+        .withFinalizeWriteParallelism(PARALLELISM)
+        .withDeleteParallelism(PARALLELISM)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .build();
+    final SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime());
+    final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI
+        ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts)
+        : dataGen::generateInserts;
+    final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI
+        ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates)
+        : dataGen::generateUniqueUpdates;
+
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+    insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);
+
+    // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
+    for (int i = 0; i < 8; i++) {
+      String newCommitTime = makeNewCommitTime();
+      try {
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
+
+        List<WriteStatus> statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
+        // Verify there are no errors
+        assertNoWriteErrors(statuses);
+
+        metaClient = HoodieTableMetaClient.reload(metaClient);
+        HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient);
+        HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
+        HoodieInstant lastInstant = activeTimeline.lastInstant().get();
+        if (cfg.isAsyncClean()) {
+          activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
+        }
+        // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
+        // commit
+        Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
+        Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
+        if (earliestRetainedCommit.isPresent()) {
+          acceptableCommits
+              .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
+                  .getInstants().collect(Collectors.toSet()));
+          acceptableCommits.add(earliestRetainedCommit.get());
+        }
+
+        TableFileSystemView fsView = table1.getFileSystemView();
+        // Need to ensure the following
+        for (String partitionPath : dataGen.getPartitionPaths()) {
+          List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+          for (HoodieFileGroup fileGroup : fileGroups) {
+            Set<String> commitTimes = new HashSet<>();
+            fileGroup.getAllBaseFiles().forEach(value -> {
+              LOG.debug("Data File - " + value);
+              commitTimes.add(value.getCommitTime());
+            });
+            if (cfg.isAsyncClean()) {
+              commitTimes.remove(lastInstant.getTimestamp());
+            }
+            assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
+                "Only contain acceptable versions of file should be present");
+          }
+        }
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 424bb6c53e..d2286decfe 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -20,6 +20,7 @@ package org.apache.hudi.testutils;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
@@ -39,15 +40,16 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCleanConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.table.HoodieSparkTable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -165,18 +167,18 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
     return table;
   }
 
-  public void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+  public void assertPartitionMetadataForRecords(String basePath, List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
     Set<String> partitionPathSet = inputRecords.stream()
         .map(HoodieRecord::getPartitionPath)
         .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs);
   }
 
-  public void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+  public void assertPartitionMetadataForKeys(String basePath, List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
     Set<String> partitionPathSet = inputKeys.stream()
         .map(HoodieKey::getPartitionPath)
         .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs);
   }
 
   /**
@@ -186,7 +188,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
    * @param fs File System
    * @throws IOException in case of error
    */
-  public void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+  public static void assertPartitionMetadata(String basePath, String[] partitionPaths, FileSystem fs) throws IOException {
     for (String partitionPath : partitionPaths) {
       assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
       HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
@@ -201,7 +203,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
    * @param taggedRecords Tagged Records
    * @param instantTime Commit Timestamp
    */
-  public void checkTaggedRecords(List<HoodieRecord> taggedRecords, String instantTime) {
+  public static void checkTaggedRecords(List<HoodieRecord> taggedRecords, String instantTime) {
     for (HoodieRecord rec : taggedRecords) {
       assertTrue(rec.isCurrentLocationKnown(), "Record " + rec + " found with no location.");
       assertEquals(rec.getCurrentLocation().getInstantTime(), instantTime,
@@ -214,7 +216,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
    *
    * @param records List of Hoodie records
    */
-  public void assertNodupesWithinPartition(List<HoodieRecord<RawTripTestPayload>> records) {
+  public static void assertNodupesWithinPartition(List<HoodieRecord<RawTripTestPayload>> records) {
     Map<String, Set<String>> partitionToKeys = new HashMap<>();
     for (HoodieRecord r : records) {
       String key = r.getRecordKey();
@@ -233,29 +235,46 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
    * guaranteed by record-generation function itself.
    *
    * @param writeConfig       Hoodie Write Config
-   * @param recordGenFunction Records Generation function
+   * @param recordsGenFunction Records Generation function
    * @return Wrapped function
    */
-  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+  public static Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+      final String basePath,
+      final Configuration hadoopConf,
+      final HoodieSparkEngineContext context,
+      final HoodieWriteConfig writeConfig,
+      final Function2<List<HoodieRecord>, String, Integer> recordsGenFunction) {
     return (commit, numRecords) -> {
       final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+      List<HoodieRecord> records = recordsGenFunction.apply(commit, numRecords);
       final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
-      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
+      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table);
       return taggedRecords.collect();
     };
   }
 
-  private Function3<List<HoodieRecord>, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction) {
+  /**
+   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
+   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
+   * guaranteed by record-generation function itself.
+   *
+   * @param writeConfig       Hoodie Write Config
+   * @param recordsGenFunction Records Generation function (for partition)
+   * @return Wrapped function
+   */
+  public static Function3<List<HoodieRecord>, String, Integer, String> wrapPartitionRecordsGenFunctionForPreppedCalls(
+      final String basePath,
+      final Configuration hadoopConf,
+      final HoodieSparkEngineContext context,
+      final HoodieWriteConfig writeConfig,
+      final Function3<List<HoodieRecord>, String, Integer, String> recordsGenFunction) {
     return (commit, numRecords, partition) -> {
       final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords, partition);
+      List<HoodieRecord> records = recordsGenFunction.apply(commit, numRecords, partition);
       final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
-      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
+      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table);
       return taggedRecords.collect();
     };
   }
@@ -269,16 +288,20 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
    * @param keyGenFunction Keys Generation function
    * @return Wrapped function
    */
-  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+  public static Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+      final String basePath,
+      final Configuration hadoopConf,
+      final HoodieSparkEngineContext context,
+      final HoodieWriteConfig writeConfig,
+      final Function<Integer, List<HoodieKey>> keyGenFunction) {
     return (numRecords) -> {
       final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
       List<HoodieKey> records = keyGenFunction.apply(numRecords);
       final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
-      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+      JavaRDD<HoodieRecord> recordsToDelete = context.getJavaSparkContext().parallelize(records, 1)
           .map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
-      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, recordsToDelete, table);
+      JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, recordsToDelete, table);
       return taggedRecords.map(record -> record.getKey()).collect();
     };
   }
@@ -295,16 +318,24 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
       HoodieWriteConfig writeConfig,
       Function2<List<HoodieRecord>, String, Integer> wrapped) {
     if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+      return wrapRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
     } else {
       return wrapped;
     }
   }
 
+  /**
+   * Generate wrapper for record generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function (for partition)
+   * @return Wrapped Function
+   */
   public Function3<List<HoodieRecord>, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI,
       HoodieWriteConfig writeConfig, Function3<List<HoodieRecord>, String, Integer, String> wrapped) {
     if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+      return wrapPartitionRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
     } else {
       return wrapped;
     }
@@ -321,7 +352,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
   public Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
       HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
     if (isPreppedAPI) {
-      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+      return wrapDeleteKeysGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
     } else {
       return wrapped;
     }
@@ -562,7 +593,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
       client.commit(newCommitTime, result);
     }
     // check the partition metadata is written out
-    assertPartitionMetadataForRecords(records, fs);
+    assertPartitionMetadataForRecords(basePath, records, fs);
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
@@ -634,7 +665,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
     assertNoWriteErrors(statuses);
 
     // check the partition metadata is written out
-    assertPartitionMetadataForKeys(keysToDelete, fs);
+    assertPartitionMetadataForKeys(basePath, keysToDelete, fs);
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index e6a4d63e8c..a0c093be16 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -17,13 +17,6 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hudi.HoodieConversionUtils;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -75,6 +68,14 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.timeline.service.TimelineService;
 import org.apache.hudi.util.JFunction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -86,7 +87,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -105,6 +105,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -510,12 +512,21 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     return tableView;
   }
 
+  /**
+   * @deprecated Use {@link #tagLocation(HoodieIndex, HoodieEngineContext, JavaRDD, HoodieTable)} instead.
+   */
+  @Deprecated
   public JavaRDD<HoodieRecord> tagLocation(
       HoodieIndex index, JavaRDD<HoodieRecord> records, HoodieTable table) {
     return HoodieJavaRDD.getJavaRDD(
         index.tagLocation(HoodieJavaRDD.of(records), context, table));
   }
 
+  public static JavaRDD<HoodieRecord> tagLocation(
+      HoodieIndex index, HoodieEngineContext context, JavaRDD<HoodieRecord> records, HoodieTable table) {
+    return HoodieJavaRDD.getJavaRDD(index.tagLocation(HoodieJavaRDD.of(records), context, table));
+  }
+
   public static Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord> inputRecordsRDD) {
     HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
     WorkloadStat globalStat = new WorkloadStat();