You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/06/08 12:52:54 UTC

[hudi] 01/04: [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6dcd0a3524fe7be0bbbd3e673ed7e1d4b035e0cb
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Tue Jun 2 01:49:37 2020 -0700

    [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  16 +
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 -----
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../apache/hudi/client/TestHoodieClientBase.java   | 938 ++++++++++-----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 -
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  12 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  38 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  12 +-
 pom.xml                                            |   1 +
 19 files changed, 745 insertions(+), 950 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 2e3bc01..708ae29 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -33,4 +33,20 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_HOODIE_PROPERTY = "Property";
   public static final String HEADER_OLD_VALUE = "Old Value";
   public static final String HEADER_NEW_VALUE = "New Value";
+
+  /**
+   * Fields of Stats.
+   */
+  public static final String HEADER_COMMIT_TIME = "CommitTime";
+  public static final String HEADER_TOTAL_UPSERTED = "Total Upserted";
+  public static final String HEADER_TOTAL_WRITTEN = "Total Written";
+  public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write Amplification Factor";
+  public static final String HEADER_HISTOGRAM_MIN = "Min";
+  public static final String HEADER_HISTOGRAM_10TH = "10th";
+  public static final String HEADER_HISTOGRAM_50TH = "50th";
+  public static final String HEADER_HISTOGRAM_AVG = "avg";
+  public static final String HEADER_HISTOGRAM_95TH = "95th";
+  public static final String HEADER_HISTOGRAM_MAX = "Max";
+  public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
+  public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index b05aee2..4874777 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @Component
 public class StatsCommand implements CommandMarker {
 
-  private static final int MAX_FILES = 1000000;
+  public static final int MAX_FILES = 1000000;
 
   @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
       + "records were actually written")
@@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker {
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
 
-  private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
+  public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
     return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), s.getMedian(), s.getMean(), s.get95thPercentile(),
         s.getMax(), s.size(), s.getStdDev()};
   }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
index ad81af5..d9f1688 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -58,4 +58,4 @@ public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarne
   protected static JLineShellComponent getShell() {
     return shell;
   }
-}
\ No newline at end of file
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
deleted file mode 100644
index 9e78ac7..0000000
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.HoodiePrintHelper;
-import org.apache.hudi.cli.HoodieTableHeaderFields;
-import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.TimelineLayoutVersion;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FSUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test class for {@link RepairsCommand}.
- */
-public class TestRepairsCommand extends AbstractShellIntegrationTest {
-
-  private String tablePath;
-
-  @Before
-  public void init() throws IOException {
-    String tableName = "test_table";
-    tablePath = basePath + File.separator + tableName;
-
-    // Create table and connect
-    new TableCommand().createTable(
-        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
-        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
-  }
-
-  /**
-   * Test case for dry run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithDryRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    // default is dry run.
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
-    assertTrue(cr.isSuccess());
-
-    // expected all 'No'.
-    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
-        .stream()
-        .map(partition -> new String[] {partition, "No", "None"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for real run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithRealRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
-    assertTrue(cr.isSuccess());
-
-    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
-    // after dry run, the action will be 'Repaired'
-    String[][] rows = paths.stream()
-        .map(partition -> new String[] {partition, "No", "Repaired"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-
-    cr = getShell().executeCommand("repair addpartitionmeta");
-
-    // after real run, Metadata is present now.
-    rows = paths.stream()
-        .map(partition -> new String[] {partition, "Yes", "None"})
-        .toArray(String[][]::new);
-    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair overwrite-hoodie-props'.
-   */
-  @Test
-  public void testOverwriteHoodieProperties() throws IOException {
-    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
-    assertNotNull("New property file must exist", newProps);
-
-    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
-    assertTrue(cr.isSuccess());
-
-    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
-
-    // after overwrite, the stored value in .hoodie is equals to which read from properties.
-    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
-    Properties expectProps = new Properties();
-    expectProps.load(new FileInputStream(new File(newProps.getPath())));
-
-    Map<String, String> expected = expectProps.entrySet().stream()
-        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
-    assertEquals(expected, result);
-
-    // check result
-    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
-        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
-    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
-        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
-        .toArray(String[][]::new);
-    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
-        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
-
-    assertEquals(expect, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair corrupted clean files'.
-   */
-  @Test
-  public void testRemoveCorruptedPendingCleanAction() throws IOException {
-    HoodieCLI.conf = jsc.hadoopConfiguration();
-
-    Configuration conf = HoodieCLI.conf;
-
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    // Create four requested files
-    for (int i = 100; i < 104; i++) {
-      String timestamp = String.valueOf(i);
-      // Write corrupted requested Compaction
-      HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf);
-    }
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    // first, there are four instants
-    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-
-    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
-    assertTrue(cr.isSuccess());
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-  }
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 37dfe3d..f5f6233 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -120,7 +120,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
   }
 
-  HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
     this(jsc, clientConfig, rollbackPending, index, Option.empty());
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 5f47bf5..6e6458b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,496 +72,477 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-  @Before
-  public void setUp() throws Exception {
-    initResources();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-    return getHoodieWriteClient(cfg, false);
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-                                                   HoodieIndex index) {
-    return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-  }
-
-  protected HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-  }
-
-  /**
-   * Get Default HoodieWriteConfig for tests.
-   *
-   * @return Default Hoodie Write Config for tests
-   */
-  protected HoodieWriteConfig getConfig() {
-    return getConfigBuilder().build();
-  }
-
-  protected HoodieWriteConfig getConfig(IndexType indexType) {
-    return getConfigBuilder(indexType).build();
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  protected HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
-  }
-
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, IndexType.BLOOM);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-        .withWriteStatusClass(MetadataMergeWriteStatus.class)
-        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-        .forTable("test-trip-table")
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-  }
-
-  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    ((SyncableFileSystemView) (table.getSliceView())).reset();
-    return table;
-  }
-
-  /**
-   * Assert no failures in writing hoodie files.
-   *
-   * @param statuses List of Write Status
-   */
-  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-    // Verify there are no errors
-    for (WriteStatus status : statuses) {
-      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+    @Before
+    public void setUp() throws Exception {
+        initResources();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupResources();
+    }
+
+    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+    }
+
+    /**
+     * Get Default HoodieWriteConfig for tests.
+     *
+     * @return Default Hoodie Write Config for tests
+     */
+    protected HoodieWriteConfig getConfig() {
+        return getConfigBuilder().build();
     }
-  }
-
-  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputRecords.stream()
-        .map(HoodieRecord::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputKeys.stream()
-        .map(HoodieKey::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  /**
-   * Ensure presence of partition meta-data at known depth.
-   *
-   * @param partitionPaths Partition paths to check
-   * @param fs             File System
-   * @throws IOException in case of error
-   */
-  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-    for (String partitionPath : partitionPaths) {
-      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-      pmeta.readFromFS();
-      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+
+    protected HoodieWriteConfig getConfig(IndexType indexType) {
+        return getConfigBuilder(indexType).build();
+    }
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    protected HoodieWriteConfig.Builder getConfigBuilder() {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     }
-  }
-
-  /**
-   * Ensure records have location field set.
-   *
-   * @param taggedRecords Tagged Records
-   * @param commitTime    Commit Timestamp
-   */
-  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-    for (HoodieRecord rec : taggedRecords) {
-      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-          rec.getCurrentLocation().getInstantTime(), commitTime);
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
     }
-  }
-
-  /**
-   * Assert that there is no duplicate key at the partition level.
-   *
-   * @param records List of Hoodie records
-   */
-  void assertNodupesWithinPartition(List<HoodieRecord> records) {
-    Map<String, Set<String>> partitionToKeys = new HashMap<>();
-    for (HoodieRecord r : records) {
-      String key = r.getRecordKey();
-      String partitionPath = r.getPartitionPath();
-      if (!partitionToKeys.containsKey(partitionPath)) {
-        partitionToKeys.put(partitionPath, new HashSet<>());
-      }
-      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-      partitionToKeys.get(partitionPath).add(key);
+
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+        return getConfigBuilder(schemaStr, IndexType.BLOOM);
     }
-  }
-
-  /**
-   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by record-generation function itself.
-   *
-   * @param writeConfig       Hoodie Write Config
-   * @param recordGenFunction Records Generation function
-   * @return Wrapped function
-   */
-  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-    return (commit, numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-      return taggedRecords.collect();
-    };
-  }
-
-  /**
-   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by key-generation function itself.
-   *
-   * @param writeConfig    Hoodie Write Config
-   * @param keyGenFunction Keys Generation function
-   * @return Wrapped function
-   */
-  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-    return (numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieKey> records = keyGenFunction.apply(numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-      return taggedRecords.map(record -> record.getKey()).collect();
-    };
-  }
-
-  /**
-   * Generate wrapper for record generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-                                                                                 HoodieWriteConfig writeConfig,
-                                                                                 Function2<List<HoodieRecord>, String, Integer> wrapped) {
-    if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+            .withWriteStatusClass(MetadataMergeWriteStatus.class)
+            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+            .forTable("test-trip-table")
+            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
     }
-  }
-
-  /**
-   * Generate wrapper for delete key generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-                                                                       HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-    if (isPreppedAPI) {
-      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+        ((SyncableFileSystemView) (table.getSliceView())).reset();
+        return table;
     }
-  }
-
-  /**
-   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param writeFn                Write Function to be used for insertion
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                        String initCommitTime, int numRecordsInThisCommit,
-                                        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-  }
-
-  /**
-   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig                  Hoodie Write Config
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param writeFn                      Write Function to be used for upsert
-   * @param isPreppedAPI                 Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits);
-  }
-
-  /**
-   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param deleteFn               Delete Function to be used for deletes
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-    final Function<Integer, List<HoodieKey>> keyGenFunction =
-        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-        keyGenFunction,
-        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-  }
-
-  /**
-   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param recordGenFunction            Records Generation Function
-   * @param writeFn                      Write Function to be used for upsert
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                  Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-                                  Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-                                  Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                  boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-    // Write 1 (only inserts)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(records, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-      if (commitTimesBetweenPrevAndNew.isPresent()) {
-        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-        });
-      }
+
+    /**
+     * Assert no failures in writing hoodie files.
+     *
+     * @param statuses List of Write Status
+     */
+    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+        // Verify there are no errors
+        for (WriteStatus status : statuses) {
+            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+        }
     }
-    return result;
-  }
-
-  /**
-   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-   *
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param keyGenFunction         Key Generation function
-   * @param deleteFn               Write Function to be used for delete
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                   String initCommitTime, int numRecordsInThisCommit,
-                                   Function<Integer, List<HoodieKey>> keyGenFunction,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-    // Delete 1 (only deletes)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForKeys(keysToDelete, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting 3 commits.", 3,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-              + " since it is a delete operation",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+
+    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputRecords.stream()
+            .map(HoodieRecord::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
     }
-    return result;
-  }
 
-  /**
-   * Get Cleaner state corresponding to a partition path.
-   *
-   * @param hoodieCleanStatsTwo List of Clean Stats
-   * @param partitionPath       Partition path for filtering
-   * @return Cleaner state corresponding to partition path
-   */
-  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-  }
+    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputKeys.stream()
+            .map(HoodieKey::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    }
 
-  // Functional Interfaces for passing lambda and Hoodie Write API contexts
+    /**
+     * Ensure presence of partition meta-data at known depth.
+     *
+     * @param partitionPaths Partition paths to check
+     * @param fs File System
+     * @throws IOException in case of error
+     */
+    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+        for (String partitionPath : partitionPaths) {
+            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+            pmeta.readFromFS();
+            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function2<R, T1, T2> {
+    /**
+     * Ensure records have location field set.
+     *
+     * @param taggedRecords Tagged Records
+     * @param commitTime Commit Timestamp
+     */
+    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+        for (HoodieRecord rec : taggedRecords) {
+            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+                rec.getCurrentLocation().getInstantTime(), commitTime);
+        }
+    }
 
-    R apply(T1 v1, T2 v2) throws IOException;
-  }
+    /**
+     * Assert that there is no duplicate key at the partition level.
+     *
+     * @param records List of Hoodie records
+     */
+    void assertNodupesWithinPartition(List<HoodieRecord> records) {
+        Map<String, Set<String>> partitionToKeys = new HashMap<>();
+        for (HoodieRecord r : records) {
+            String key = r.getRecordKey();
+            String partitionPath = r.getPartitionPath();
+            if (!partitionToKeys.containsKey(partitionPath)) {
+                partitionToKeys.put(partitionPath, new HashSet<>());
+            }
+            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+            partitionToKeys.get(partitionPath).add(key);
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function3<R, T1, T2, T3> {
+    /**
+     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param recordGenFunction Records Generation function
+     * @return Wrapped function
+     */
+    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+        return (commit, numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+            return taggedRecords.collect();
+        };
+    }
 
-    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-  }
+    /**
+     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param keyGenFunction Keys Generation function
+     * @return Wrapped function
+     */
+    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+        return (numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieKey> records = keyGenFunction.apply(numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+            return taggedRecords.map(record -> record.getKey()).collect();
+        };
+    }
+
+    /**
+     * Generate wrapper for record generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig,
+        Function2<List<HoodieRecord>, String, Integer> wrapped) {
+        if (isPreppedAPI) {
+            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Generate wrapper for delete key generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+        if (isPreppedAPI) {
+            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for insertion
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+    }
+
+    /**
+     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for upsert
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+            expTotalCommits);
+    }
+
+    /**
+     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param deleteFn Delete Function to be used for deletes
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+        final Function<Integer, List<HoodieKey>> keyGenFunction =
+            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+            keyGenFunction,
+            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+    }
+
+    /**
+     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param recordGenFunction Records Generation Function
+     * @param writeFn Write Function to be used for upsert
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+        // Write 1 (only inserts)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForRecords(records, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+            if (commitTimesBetweenPrevAndNew.isPresent()) {
+                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+                });
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param keyGenFunction Key Generation function
+     * @param deleteFn Write Function to be used for delete
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function<Integer, List<HoodieKey>> keyGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+        // Delete 1 (only deletes)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForKeys(keysToDelete, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting 3 commits.", 3,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+                    + " since it is a delete operation",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+        }
+        return result;
+    }
+
+    /**
+     * Get Cleaner state corresponding to a partition path.
+     *
+     * @param hoodieCleanStatsTwo List of Clean Stats
+     * @param partitionPath Partition path for filtering
+     * @return Cleaner state corresponding to partition path
+     */
+    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+    }
+
+    // Functional Interfaces for passing lambda and Hoodie Write API contexts
+
+    @FunctionalInterface
+    public interface Function2<R, T1, T2> {
+
+        R apply(T1 v1, T2 v2) throws IOException;
+    }
+
+    @FunctionalInterface
+    public interface Function3<R, T1, T2, T3> {
+
+        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+    }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 9b70c10..8d3fa13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness {
     cleanupTestDataGenerator();
   }
 
-  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
-  }
-
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index aad8edf..ab6e940 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.TestRawTripPayload;
+import java.io.IOException;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -58,8 +59,9 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
     cleanupSparkContexts();
+    cleanupFileSystem();
   }
 
   //@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..e4202f0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,11 +17,15 @@
 
 package org.apache.hudi.common;
 
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,225 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-  protected transient JavaSparkContext jsc = null;
-  protected transient SQLContext sqlContext;
-  protected transient FileSystem fs;
-  protected transient HoodieTestDataGenerator dataGen = null;
-  protected transient ExecutorService executorService;
-  protected transient HoodieTableMetaClient metaClient;
-  private static AtomicInteger instantGen = new AtomicInteger(1);
-
-  public String getNextInstant() {
-    return String.format("%09d", instantGen.getAndIncrement());
-  }
-
-  // dfs
-  protected String dfsBasePath;
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
-
-  /**
-   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-   *
-   * @throws IOException
-   */
-  public void initResources() throws IOException {
-    initPath();
-    initSparkContexts();
-    initTestDataGenerator();
-    initFileSystem();
-    initMetaClient();
-  }
-
-  /**
-   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-   * 
-   * @throws IOException
-   */
-  public void cleanupResources() throws IOException {
-    cleanupMetaClient();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupFileSystem();
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-   *
-   * @param appName The specified application name.
-   */
-  protected void initSparkContexts(String appName) {
-    // Initialize a local spark env
-    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-    jsc.setLogLevel("ERROR");
-
-    // SQLContext stuff
-    sqlContext = new SQLContext(jsc);
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-   * <b>TestHoodieClient</b>.
-   */
-  protected void initSparkContexts() {
-    initSparkContexts("TestHoodieClient");
-  }
-
-  /**
-   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-   */
-  protected void cleanupSparkContexts() {
-    if (sqlContext != null) {
-      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-      sqlContext.clearCache();
-      sqlContext = null;
+    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+    protected transient JavaSparkContext jsc = null;
+    protected transient SQLContext sqlContext;
+    protected transient FileSystem fs;
+    protected transient HoodieTestDataGenerator dataGen = null;
+    protected transient ExecutorService executorService;
+    protected transient HoodieTableMetaClient metaClient;
+    private static AtomicInteger instantGen = new AtomicInteger(1);
+    protected transient HoodieWriteClient client;
+
+    public String getNextInstant() {
+        return String.format("%09d", instantGen.getAndIncrement());
+    }
+
+    // dfs
+    protected String dfsBasePath;
+    protected transient HdfsTestService hdfsTestService;
+    protected transient MiniDFSCluster dfsCluster;
+    protected transient DistributedFileSystem dfs;
+
+    /**
+     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void initResources() throws IOException {
+        initPath();
+        initSparkContexts();
+        initTestDataGenerator();
+        initFileSystem();
+        initMetaClient();
+    }
+
+    /**
+     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void cleanupResources() throws IOException {
+        cleanupClients();
+        cleanupSparkContexts();
+        cleanupTestDataGenerator();
+        cleanupFileSystem();
+    }
+
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+     *
+     * @param appName The specified application name.
+     */
+    protected void initSparkContexts(String appName) {
+        // Initialize a local spark env
+        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+        jsc.setLogLevel("ERROR");
+
+        // SQLContext stuff
+        sqlContext = new SQLContext(jsc);
     }
 
-    if (jsc != null) {
-      LOG.info("Closing spark context used in previous test-case");
-      jsc.close();
-      jsc.stop();
-      jsc = null;
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+     * <b>TestHoodieClient</b>.
+     */
+    protected void initSparkContexts() {
+        initSparkContexts("TestHoodieClient");
     }
-  }
-
-  /**
-   * Initializes a file system with the hadoop configuration of Spark context.
-   */
-  protected void initFileSystem() {
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+
+    /**
+     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+     */
+    protected void cleanupSparkContexts() {
+        if (sqlContext != null) {
+            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+            sqlContext.clearCache();
+            sqlContext = null;
+        }
+
+        if (jsc != null) {
+            LOG.info("Closing spark context used in previous test-case");
+            jsc.close();
+            jsc.stop();
+            jsc = null;
+        }
+    }
+
+    /**
+     * Initializes a file system with the hadoop configuration of Spark context.
+     */
+    protected void initFileSystem() {
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
     }
 
-    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-  }
-
-  /**
-   * Initializes file system with a default empty configuration.
-   */
-  protected void initFileSystemWithDefaultConfiguration() {
-    initFileSystemWithConfiguration(new Configuration());
-  }
-
-  /**
-   * Cleanups file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupFileSystem() throws IOException {
-    if (fs != null) {
-      LOG.warn("Closing file-system instance used in previous test-run");
-      fs.close();
+    /**
+     * Initializes file system with a default empty configuration.
+     */
+    protected void initFileSystemWithDefaultConfiguration() {
+        initFileSystemWithConfiguration(new Configuration());
     }
-  }
-
-  /**
-   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by
-   * {@code getTableType()}.
-   *
-   * @throws IOException
-   */
-  protected void initMetaClient() throws IOException {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+
+    /**
+     * Cleanups file system.
+     */
+    protected void cleanupFileSystem() throws IOException {
+        if (fs != null) {
+            LOG.warn("Closing file-system instance used in previous test-run");
+            fs.close();
+        }
     }
 
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+    /**
+     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+     */
+    protected void initMetaClient() throws IOException {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
     }
 
-    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-  }
-
-  /**
-   * Cleanups table type.
-   */
-  protected void cleanupMetaClient() {
-    metaClient = null;
-  }
-
-  /**
-   * Initializes a test data generator which used to generate test datas.
-   *
-   */
-  protected void initTestDataGenerator() {
-    dataGen = new HoodieTestDataGenerator();
-  }
-
-  /**
-   * Cleanups test data generator.
-   *
-   */
-  protected void cleanupTestDataGenerator() {
-    dataGen = null;
-  }
-
-  /**
-   * Initializes a distributed file system and base directory.
-   *
-   * @throws IOException
-   */
-  protected void initDFS() throws IOException {
-    FileSystem.closeAll();
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-
-    // Create a temp folder as the base path
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    dfs.mkdirs(new Path(dfsBasePath));
-  }
-
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown();
+    /**
+     * Cleanups table type.
+     */
+    protected void cleanupClients() {
+        metaClient = null;
+        if (null != client) {
+            client.close();
+            client = null;
+        }
     }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
-  /**
-   * Initializes executor service with a fixed thread pool.
-   *
-   * @param threadNum specify the capacity of the fixed thread pool
-   */
-  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-    executorService = Executors.newFixedThreadPool(threadNum);
-  }
-
-  /**
-   * Cleanups the executor service.
-   */
-  protected void cleanupExecutorService() {
-    if (this.executorService != null) {
-      this.executorService.shutdownNow();
-      this.executorService = null;
+
+    /**
+     * Initializes a test data generator which used to generate test datas.
+     */
+    protected void initTestDataGenerator() {
+        dataGen = new HoodieTestDataGenerator();
     }
-  }
 
-  private void initFileSystemWithConfiguration(Configuration configuration) {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+    /**
+     * Cleanups test data generator.
+     */
+    protected void cleanupTestDataGenerator() {
+        dataGen = null;
     }
 
-    fs = FSUtils.getFs(basePath, configuration);
-    if (fs instanceof LocalFileSystem) {
-      LocalFileSystem lfs = (LocalFileSystem) fs;
-      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-      // So, for the tests, we enforce checksum verification to circumvent the problem
-      lfs.setVerifyChecksum(true);
+    /**
+     * Initializes a distributed file system and base directory.
+     */
+    protected void initDFS() throws IOException {
+        FileSystem.closeAll();
+        hdfsTestService = new HdfsTestService();
+        dfsCluster = hdfsTestService.start(true);
+
+        // Create a temp folder as the base path
+        dfs = dfsCluster.getFileSystem();
+        dfsBasePath = dfs.getWorkingDirectory().toString();
+        dfs.mkdirs(new Path(dfsBasePath));
     }
-  }
 
+    /**
+     * Cleanups the distributed file system.
+     */
+    protected void cleanupDFS() throws IOException {
+        if (hdfsTestService != null) {
+            hdfsTestService.stop();
+            dfsCluster.shutdown();
+            hdfsTestService = null;
+            dfsCluster = null;
+            dfs = null;
+        }
+        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+        // same JVM
+        FileSystem.closeAll();
+    }
+
+    /**
+     * Initializes executor service with a fixed thread pool.
+     *
+     * @param threadNum specify the capacity of the fixed thread pool
+     */
+    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+        executorService = Executors.newFixedThreadPool(threadNum);
+    }
+
+    /**
+     * Cleanups the executor service.
+     */
+    protected void cleanupExecutorService() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+            this.executorService = null;
+        }
+    }
+
+    private void initFileSystemWithConfiguration(Configuration configuration) {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        fs = FSUtils.getFs(basePath, configuration);
+        if (fs instanceof LocalFileSystem) {
+            LocalFileSystem lfs = (LocalFileSystem) fs;
+            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+            // So, for the tests, we enforce checksum verification to circumvent the problem
+            lfs.setVerifyChecksum(true);
+        }
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+        return getHoodieWriteClient(cfg, false);
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+    }
+
+    public HoodieReadClient getHoodieReadClient(String basePath) {
+        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+        HoodieIndex index) {
+        if (null != client) {
+            client.close();
+            client = null;
+        }
+        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+        return client;
+    }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 05638e2..6ddb578 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -60,7 +60,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     if (utility != null) {
       utility.shutdownMiniCluster();
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..43f2fd1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -86,6 +86,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   @AfterClass
   public static void clean() throws Exception {
     if (utility != null) {
+      utility.deleteTable(tableName);
       utility.shutdownMiniCluster();
     }
   }
@@ -115,11 +116,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
@@ -132,7 +129,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
@@ -172,7 +169,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(newCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -206,7 +203,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -256,7 +253,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // only for test, set the hbaseConnection to mocked object
     index.setHbaseConnection(hbaseConnection);
 
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
@@ -281,7 +278,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void testTotalPutsBatching() throws Exception {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 91435f8..b97fefc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -45,7 +45,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index d29cfa4..105b0e8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -107,7 +107,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupFileSystem();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index ddf2775..55d4526 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -80,7 +80,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 664f4b5..7fd02bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -68,11 +68,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
+    cleanupFileSystem();
   }
 
   @Test
@@ -83,9 +80,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
 
     // Build a write config with bulkinsertparallelism set
     HoodieWriteConfig cfg = getConfigBuilder().build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
-
       /**
        * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
        * each. id1 (21 records), id2 (21 records), id3, id4
@@ -224,7 +220,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
   public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder().build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index ec64080..6887531 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestUtils;
@@ -85,7 +86,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     cleanupFileSystem();
     cleanupTestDataGenerator();
   }
@@ -129,6 +130,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 740caf2..fdc968d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -96,16 +96,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     cleanupDFS();
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
   public void testSimpleInsertAndUpdate() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -190,7 +187,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       String newCommitTime = "001";
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -213,7 +210,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -298,7 +295,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -351,7 +348,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
       /**
@@ -394,7 +391,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       final String commitTime1 = "002";
       // WriteClient with custom config (disable small file handling)
-      try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
+      try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
         secondClient.startCommitWithTime(commitTime1);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -424,7 +421,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        * Write 3 (inserts + updates - testing successful delta commit)
        */
       final String commitTime2 = "002";
-      try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
+      try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
         thirdClient.startCommitWithTime(commitTime2);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -500,7 +497,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (final HoodieWriteClient client = getWriteClient(cfg);) {
+    try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       /**
        * Write 1 (only inserts)
        */
@@ -541,7 +538,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       newCommitTime = "002";
       // WriteClient with custom config (disable small file handling)
-      HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
+      HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
       nClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -664,7 +661,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -743,7 +740,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -816,7 +813,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -853,7 +850,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -927,7 +924,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -979,10 +976,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsInMetadata() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
@@ -1080,7 +1076,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsWithSmallFileHandling() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 8fa55ec..09d62a7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   private HoodieWriteConfig getConfig() {
@@ -114,8 +111,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -132,8 +128,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-      String newCommitTime = "100";
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+     String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/pom.xml b/pom.xml
index f1990e1..6370c37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <skip>${skipUTs}</skip>
+          <argLine>-Xmx4g</argLine>
           <systemPropertyVariables>
             <log4j.configuration>
               ${surefire-log4j.file}