You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/06/08 12:52:53 UTC

[hudi] branch release-0.5.3 updated (5fcc461 -> 41fb6c2)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git.


    omit 5fcc461  Bumping release candidate number 1
     new 6dcd0a3  [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
     new ae48ecb  [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions
     new d3afcba  Making few fixes after cherry picking
     new 41fb6c2  Bumping release candidate number 2

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5fcc461)
            \
             N -- N -- N   refs/heads/release-0.5.3 (41fb6c2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/hoodie/hadoop/base/pom.xml                  |   2 +-
 docker/hoodie/hadoop/datanode/pom.xml              |   2 +-
 docker/hoodie/hadoop/historyserver/pom.xml         |   2 +-
 docker/hoodie/hadoop/hive_base/pom.xml             |   2 +-
 docker/hoodie/hadoop/namenode/pom.xml              |   2 +-
 docker/hoodie/hadoop/pom.xml                       |   2 +-
 docker/hoodie/hadoop/prestobase/pom.xml            |   2 +-
 docker/hoodie/hadoop/spark_base/pom.xml            |   2 +-
 docker/hoodie/hadoop/sparkadhoc/pom.xml            |   2 +-
 docker/hoodie/hadoop/sparkmaster/pom.xml           |   2 +-
 docker/hoodie/hadoop/sparkworker/pom.xml           |   2 +-
 hudi-cli/pom.xml                                   |   2 +-
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  16 ++
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 ---------------------
 hudi-client/pom.xml                                |   2 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../client/embedded/EmbeddedTimelineService.java   |   4 +-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |   2 +
 .../apache/hudi/table/HoodieMergeOnReadTable.java  |   2 +
 .../apache/hudi/client/TestHoodieClientBase.java   | 187 +++++++++----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 -
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../hudi/common/HoodieClientTestHarness.java       |  54 ++++--
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  12 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  43 ++---
 .../hudi/table/compact/TestHoodieCompactor.java    |  14 +-
 hudi-common/pom.xml                                |   2 +-
 .../table/timeline/HoodieDefaultTimeline.java      |   2 +-
 .../table/view/FileSystemViewStorageConfig.java    |  21 +++
 .../table/string/TestHoodieActiveTimeline.java     |   2 +-
 hudi-hadoop-mr/pom.xml                             |   2 +-
 hudi-hive/pom.xml                                  |   2 +-
 hudi-integ-test/pom.xml                            |   2 +-
 hudi-spark/pom.xml                                 |   2 +-
 hudi-timeline-service/pom.xml                      |   2 +-
 hudi-utilities/pom.xml                             |   2 +-
 packaging/hudi-hadoop-mr-bundle/pom.xml            |   2 +-
 packaging/hudi-hive-bundle/pom.xml                 |   2 +-
 packaging/hudi-presto-bundle/pom.xml               |   2 +-
 packaging/hudi-spark-bundle/pom.xml                |   2 +-
 packaging/hudi-timeline-server-bundle/pom.xml      |   2 +-
 packaging/hudi-utilities-bundle/pom.xml            |   2 +-
 pom.xml                                            |   3 +-
 51 files changed, 247 insertions(+), 419 deletions(-)
 delete mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java


[hudi] 03/04: Making few fixes after cherry picking

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d3afcbac3a6e5362d57570a2a5807abbf65c69d8
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun Jun 7 16:23:40 2020 -0400

    Making few fixes after cherry picking
---
 .../apache/hudi/client/TestHoodieClientBase.java   | 917 +++++++++++----------
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../apache/hudi/table/TestMergeOnReadTable.java    |   2 +
 .../hudi/table/compact/TestHoodieCompactor.java    |   6 +-
 .../table/string/TestHoodieActiveTimeline.java     |   2 +-
 5 files changed, 678 insertions(+), 675 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 6e6458b..6856489 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -72,477 +72,478 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-    @Before
-    public void setUp() throws Exception {
-        initResources();
+  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+  @Before
+  public void setUp() throws Exception {
+    initResources();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Get Default HoodieWriteConfig for tests.
+   *
+   * @return Default Hoodie Write Config for tests
+   */
+  protected HoodieWriteConfig getConfig() {
+    return getConfigBuilder().build();
+  }
+
+  protected HoodieWriteConfig getConfig(IndexType indexType) {
+    return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  protected HoodieWriteConfig.Builder getConfigBuilder() {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return getConfigBuilder(schemaStr, IndexType.BLOOM);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    ((SyncableFileSystemView) (table.getSliceView())).reset();
+    return table;
+  }
+
+  /**
+   * Assert no failures in writing hoodie files.
+   *
+   * @param statuses List of Write Status
+   */
+  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+    // Verify there are no errors
+    for (WriteStatus status : statuses) {
+      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
     }
-
-    @After
-    public void tearDown() throws Exception {
-        cleanupResources();
+  }
+
+  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputRecords.stream()
+        .map(HoodieRecord::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputKeys.stream()
+        .map(HoodieKey::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  /**
+   * Ensure presence of partition meta-data at known depth.
+   *
+   * @param partitionPaths Partition paths to check
+   * @param fs File System
+   * @throws IOException in case of error
+   */
+  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+    for (String partitionPath : partitionPaths) {
+      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+      pmeta.readFromFS();
+      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
     }
-
-    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Ensure records have location field set.
+   *
+   * @param taggedRecords Tagged Records
+   * @param commitTime Commit Timestamp
+   */
+  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+    for (HoodieRecord rec : taggedRecords) {
+      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+          rec.getCurrentLocation().getInstantTime(), commitTime);
     }
-
-    /**
-     * Get Default HoodieWriteConfig for tests.
-     *
-     * @return Default Hoodie Write Config for tests
-     */
-    protected HoodieWriteConfig getConfig() {
-        return getConfigBuilder().build();
+  }
+
+  /**
+   * Assert that there is no duplicate key at the partition level.
+   *
+   * @param records List of Hoodie records
+   */
+  void assertNodupesWithinPartition(List<HoodieRecord> records) {
+    Map<String, Set<String>> partitionToKeys = new HashMap<>();
+    for (HoodieRecord r : records) {
+      String key = r.getRecordKey();
+      String partitionPath = r.getPartitionPath();
+      if (!partitionToKeys.containsKey(partitionPath)) {
+        partitionToKeys.put(partitionPath, new HashSet<>());
+      }
+      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+      partitionToKeys.get(partitionPath).add(key);
     }
-
-    protected HoodieWriteConfig getConfig(IndexType indexType) {
-        return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param recordGenFunction Records Generation function
+   * @return Wrapped function
+   */
+  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+    return (commit, numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+      return taggedRecords.collect();
+    };
+  }
+
+  /**
+   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param keyGenFunction Keys Generation function
+   * @return Wrapped function
+   */
+  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+    return (numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieKey> records = keyGenFunction.apply(numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+      return taggedRecords.map(record -> record.getKey()).collect();
+    };
+  }
+
+  /**
+   * Generate wrapper for record generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig,
+      Function2<List<HoodieRecord>, String, Integer> wrapped) {
+    if (isPreppedAPI) {
+      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    protected HoodieWriteConfig.Builder getConfigBuilder() {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Generate wrapper for delete key generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+    if (isPreppedAPI) {
+      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  /**
+   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for insertion
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+  }
+
+  /**
+   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for upsert
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+        expTotalCommits);
+  }
+
+  /**
+   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param deleteFn Delete Function to be used for deletes
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+    final Function<Integer, List<HoodieKey>> keyGenFunction =
+        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+        keyGenFunction,
+        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+  }
+
+  /**
+   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param recordGenFunction Records Generation Function
+   * @param writeFn Write Function to be used for upsert
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+      Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+    // Write 1 (only inserts)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForRecords(records, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+      if (commitTimesBetweenPrevAndNew.isPresent()) {
+        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+        });
+      }
     }
-
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-        return getConfigBuilder(schemaStr, IndexType.BLOOM);
+    return result;
+  }
+
+  /**
+   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param keyGenFunction Key Generation function
+   * @param deleteFn Write Function to be used for delete
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function<Integer, List<HoodieKey>> keyGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+    // Delete 1 (only deletes)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForKeys(keysToDelete, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting 3 commits.", 3,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+              + " since it is a delete operation",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
     }
+    return result;
+  }
 
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-            .withWriteStatusClass(MetadataMergeWriteStatus.class)
-            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-            .forTable("test-trip-table")
-            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-    }
+  /**
+   * Get Cleaner state corresponding to a partition path.
+   *
+   * @param hoodieCleanStatsTwo List of Clean Stats
+   * @param partitionPath Partition path for filtering
+   * @return Cleaner state corresponding to partition path
+   */
+  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+  }
 
-    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        ((SyncableFileSystemView) (table.getSliceView())).reset();
-        return table;
-    }
+  // Functional Interfaces for passing lambda and Hoodie Write API contexts
 
-    /**
-     * Assert no failures in writing hoodie files.
-     *
-     * @param statuses List of Write Status
-     */
-    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-        // Verify there are no errors
-        for (WriteStatus status : statuses) {
-            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
-        }
-    }
+  @FunctionalInterface
+  public interface Function2<R, T1, T2> {
 
-    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputRecords.stream()
-            .map(HoodieRecord::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
+    R apply(T1 v1, T2 v2) throws IOException;
+  }
 
-    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputKeys.stream()
-            .map(HoodieKey::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
-
-    /**
-     * Ensure presence of partition meta-data at known depth.
-     *
-     * @param partitionPaths Partition paths to check
-     * @param fs File System
-     * @throws IOException in case of error
-     */
-    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-        for (String partitionPath : partitionPaths) {
-            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-            pmeta.readFromFS();
-            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
-        }
-    }
-
-    /**
-     * Ensure records have location field set.
-     *
-     * @param taggedRecords Tagged Records
-     * @param commitTime Commit Timestamp
-     */
-    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-        for (HoodieRecord rec : taggedRecords) {
-            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-                rec.getCurrentLocation().getInstantTime(), commitTime);
-        }
-    }
-
-    /**
-     * Assert that there is no duplicate key at the partition level.
-     *
-     * @param records List of Hoodie records
-     */
-    void assertNodupesWithinPartition(List<HoodieRecord> records) {
-        Map<String, Set<String>> partitionToKeys = new HashMap<>();
-        for (HoodieRecord r : records) {
-            String key = r.getRecordKey();
-            String partitionPath = r.getPartitionPath();
-            if (!partitionToKeys.containsKey(partitionPath)) {
-                partitionToKeys.put(partitionPath, new HashSet<>());
-            }
-            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-            partitionToKeys.get(partitionPath).add(key);
-        }
-    }
-
-    /**
-     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param recordGenFunction Records Generation function
-     * @return Wrapped function
-     */
-    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-        return (commit, numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-            return taggedRecords.collect();
-        };
-    }
-
-    /**
-     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param keyGenFunction Keys Generation function
-     * @return Wrapped function
-     */
-    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-        return (numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieKey> records = keyGenFunction.apply(numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-            return taggedRecords.map(record -> record.getKey()).collect();
-        };
-    }
+  @FunctionalInterface
+  public interface Function3<R, T1, T2, T3> {
 
-    /**
-     * Generate wrapper for record generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig,
-        Function2<List<HoodieRecord>, String, Integer> wrapped) {
-        if (isPreppedAPI) {
-            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Generate wrapper for delete key generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-        if (isPreppedAPI) {
-            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for insertion
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-    }
-
-    /**
-     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for upsert
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-            expTotalCommits);
-    }
-
-    /**
-     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param deleteFn Delete Function to be used for deletes
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-        final Function<Integer, List<HoodieKey>> keyGenFunction =
-            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-            keyGenFunction,
-            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-    }
-
-    /**
-     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param recordGenFunction Records Generation Function
-     * @param writeFn Write Function to be used for upsert
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-        // Write 1 (only inserts)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForRecords(records, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-            if (commitTimesBetweenPrevAndNew.isPresent()) {
-                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-                });
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param keyGenFunction Key Generation function
-     * @param deleteFn Write Function to be used for delete
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function<Integer, List<HoodieKey>> keyGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-        // Delete 1 (only deletes)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForKeys(keysToDelete, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting 3 commits.", 3,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-                    + " since it is a delete operation",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-        }
-        return result;
-    }
-
-    /**
-     * Get Cleaner state corresponding to a partition path.
-     *
-     * @param hoodieCleanStatsTwo List of Clean Stats
-     * @param partitionPath Partition path for filtering
-     * @return Cleaner state corresponding to partition path
-     */
-    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-    }
-
-    // Functional Interfaces for passing lambda and Hoodie Write API contexts
-
-    @FunctionalInterface
-    public interface Function2<R, T1, T2> {
-
-        R apply(T1 v1, T2 v2) throws IOException;
-    }
-
-    @FunctionalInterface
-    public interface Function3<R, T1, T2, T3> {
-
-        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-    }
+    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+  }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index e4202f0..4c7b890 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -49,239 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-    protected transient JavaSparkContext jsc = null;
-    protected transient SQLContext sqlContext;
-    protected transient FileSystem fs;
-    protected transient HoodieTestDataGenerator dataGen = null;
-    protected transient ExecutorService executorService;
-    protected transient HoodieTableMetaClient metaClient;
-    private static AtomicInteger instantGen = new AtomicInteger(1);
-    protected transient HoodieWriteClient client;
-
-    public String getNextInstant() {
-        return String.format("%09d", instantGen.getAndIncrement());
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+  protected transient JavaSparkContext jsc = null;
+  protected transient SQLContext sqlContext;
+  protected transient FileSystem fs;
+  protected transient HoodieTestDataGenerator dataGen = null;
+  protected transient ExecutorService executorService;
+  protected transient HoodieTableMetaClient metaClient;
+  private static AtomicInteger instantGen = new AtomicInteger(1);
+  protected transient HoodieWriteClient client;
+
+  public String getNextInstant() {
+    return String.format("%09d", instantGen.getAndIncrement());
+  }
+
+  // dfs
+  protected String dfsBasePath;
+  protected transient HdfsTestService hdfsTestService;
+  protected transient MiniDFSCluster dfsCluster;
+  protected transient DistributedFileSystem dfs;
+
+  /**
+   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void initResources() throws IOException {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initFileSystem();
+    initMetaClient();
+  }
+
+  /**
+   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void cleanupResources() throws IOException {
+    cleanupClients();
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    cleanupFileSystem();
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+   *
+   * @param appName The specified application name.
+   */
+  protected void initSparkContexts(String appName) {
+    // Initialize a local spark env
+    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+    jsc.setLogLevel("ERROR");
+
+    // SQLContext stuff
+    sqlContext = new SQLContext(jsc);
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+   * <b>TestHoodieClient</b>.
+   */
+  protected void initSparkContexts() {
+    initSparkContexts("TestHoodieClient");
+  }
+
+  /**
+   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+   */
+  protected void cleanupSparkContexts() {
+    if (sqlContext != null) {
+      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+      sqlContext.clearCache();
+      sqlContext = null;
     }
 
-    // dfs
-    protected String dfsBasePath;
-    protected transient HdfsTestService hdfsTestService;
-    protected transient MiniDFSCluster dfsCluster;
-    protected transient DistributedFileSystem dfs;
-
-    /**
-     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void initResources() throws IOException {
-        initPath();
-        initSparkContexts();
-        initTestDataGenerator();
-        initFileSystem();
-        initMetaClient();
+    if (jsc != null) {
+      LOG.info("Closing spark context used in previous test-case");
+      jsc.close();
+      jsc.stop();
+      jsc = null;
     }
-
-    /**
-     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void cleanupResources() throws IOException {
-        cleanupClients();
-        cleanupSparkContexts();
-        cleanupTestDataGenerator();
-        cleanupFileSystem();
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-     *
-     * @param appName The specified application name.
-     */
-    protected void initSparkContexts(String appName) {
-        // Initialize a local spark env
-        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-        jsc.setLogLevel("ERROR");
-
-        // SQLContext stuff
-        sqlContext = new SQLContext(jsc);
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-     * <b>TestHoodieClient</b>.
-     */
-    protected void initSparkContexts() {
-        initSparkContexts("TestHoodieClient");
-    }
-
-    /**
-     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-     */
-    protected void cleanupSparkContexts() {
-        if (sqlContext != null) {
-            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-            sqlContext.clearCache();
-            sqlContext = null;
-        }
-
-        if (jsc != null) {
-            LOG.info("Closing spark context used in previous test-case");
-            jsc.close();
-            jsc.stop();
-            jsc = null;
-        }
-    }
-
-    /**
-     * Initializes a file system with the hadoop configuration of Spark context.
-     */
-    protected void initFileSystem() {
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-    }
-
-    /**
-     * Initializes file system with a default empty configuration.
-     */
-    protected void initFileSystemWithDefaultConfiguration() {
-        initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Initializes a file system with the hadoop configuration of Spark context.
+   */
+  protected void initFileSystem() {
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Cleanups file system.
-     */
-    protected void cleanupFileSystem() throws IOException {
-        if (fs != null) {
-            LOG.warn("Closing file-system instance used in previous test-run");
-            fs.close();
-        }
+    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
+  }
+
+  /**
+   * Initializes file system with a default empty configuration.
+   */
+  protected void initFileSystemWithDefaultConfiguration() {
+    initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Cleanups file system.
+   */
+  protected void cleanupFileSystem() throws IOException {
+    if (fs != null) {
+      LOG.warn("Closing file-system instance used in previous test-run");
+      fs.close();
     }
-
-    /**
-     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
-     */
-    protected void initMetaClient() throws IOException {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-    }
-
-    /**
-     * Cleanups table type.
-     */
-    protected void cleanupClients() {
-        metaClient = null;
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-    }
-
-    /**
-     * Initializes a test data generator which used to generate test datas.
-     */
-    protected void initTestDataGenerator() {
-        dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+   */
+  protected void initMetaClient() throws IOException {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    /**
-     * Cleanups test data generator.
-     */
-    protected void cleanupTestDataGenerator() {
-        dataGen = null;
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Initializes a distributed file system and base directory.
-     */
-    protected void initDFS() throws IOException {
-        FileSystem.closeAll();
-        hdfsTestService = new HdfsTestService();
-        dfsCluster = hdfsTestService.start(true);
-
-        // Create a temp folder as the base path
-        dfs = dfsCluster.getFileSystem();
-        dfsBasePath = dfs.getWorkingDirectory().toString();
-        dfs.mkdirs(new Path(dfsBasePath));
+    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+  }
+
+  /**
+   * Cleanups table type.
+   */
+  protected void cleanupClients() {
+    metaClient = null;
+    if (null != client) {
+      client.close();
+      client = null;
     }
-
-    /**
-     * Cleanups the distributed file system.
-     */
-    protected void cleanupDFS() throws IOException {
-        if (hdfsTestService != null) {
-            hdfsTestService.stop();
-            dfsCluster.shutdown();
-            hdfsTestService = null;
-            dfsCluster = null;
-            dfs = null;
-        }
-        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-        // same JVM
-        FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes a test data generator which used to generate test datas.
+   */
+  protected void initTestDataGenerator() {
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Cleanups test data generator.
+   */
+  protected void cleanupTestDataGenerator() {
+    dataGen = null;
+  }
+
+  /**
+   * Initializes a distributed file system and base directory.
+   */
+  protected void initDFS() throws IOException {
+    FileSystem.closeAll();
+    hdfsTestService = new HdfsTestService();
+    dfsCluster = hdfsTestService.start(true);
+
+    // Create a temp folder as the base path
+    dfs = dfsCluster.getFileSystem();
+    dfsBasePath = dfs.getWorkingDirectory().toString();
+    dfs.mkdirs(new Path(dfsBasePath));
+  }
+
+  /**
+   * Cleanups the distributed file system.
+   */
+  protected void cleanupDFS() throws IOException {
+    if (hdfsTestService != null) {
+      hdfsTestService.stop();
+      dfsCluster.shutdown();
+      hdfsTestService = null;
+      dfsCluster = null;
+      dfs = null;
     }
-
-    /**
-     * Initializes executor service with a fixed thread pool.
-     *
-     * @param threadNum specify the capacity of the fixed thread pool
-     */
-    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-        executorService = Executors.newFixedThreadPool(threadNum);
+    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+    // same JVM
+    FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes executor service with a fixed thread pool.
+   *
+   * @param threadNum specify the capacity of the fixed thread pool
+   */
+  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+    executorService = Executors.newFixedThreadPool(threadNum);
+  }
+
+  /**
+   * Cleanups the executor service.
+   */
+  protected void cleanupExecutorService() {
+    if (this.executorService != null) {
+      this.executorService.shutdownNow();
+      this.executorService = null;
     }
+  }
 
-    /**
-     * Cleanups the executor service.
-     */
-    protected void cleanupExecutorService() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-            this.executorService = null;
-        }
+  private void initFileSystemWithConfiguration(Configuration configuration) {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    private void initFileSystemWithConfiguration(Configuration configuration) {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        fs = FSUtils.getFs(basePath, configuration);
-        if (fs instanceof LocalFileSystem) {
-            LocalFileSystem lfs = (LocalFileSystem) fs;
-            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-            // So, for the tests, we enforce checksum verification to circumvent the problem
-            lfs.setVerifyChecksum(true);
-        }
+    fs = FSUtils.getFs(basePath, configuration);
+    if (fs instanceof LocalFileSystem) {
+      LocalFileSystem lfs = (LocalFileSystem) fs;
+      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+      // So, for the tests, we enforce checksum verification to circumvent the problem
+      lfs.setVerifyChecksum(true);
     }
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-        return getHoodieWriteClient(cfg, false);
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return getHoodieWriteClient(cfg, false);
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+  }
 
-    public HoodieReadClient getHoodieReadClient(String basePath) {
-        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-    }
+  public HoodieReadClient getHoodieReadClient(String basePath) {
+    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-        HoodieIndex index) {
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-        return client;
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+      HoodieIndex index) {
+    if (null != client) {
+      client.close();
+      client = null;
     }
+    client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+    return client;
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 9f3eaea..fdc8b27 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -655,6 +655,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withEmbeddedTimelineServerEnabled(true)
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
         .build();
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 09d62a7..86a2e1f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.table.compact;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -129,7 +129,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
-     String newCommitTime = "100";
+      String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
index d77392f..b2a7f83 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -385,7 +385,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     // filterCompletedAndCompactionInstants
     // This cannot be done using checkFilter as it involves both states and actions
     final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
-    final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
+    final Set<State> states = Sets.newHashSet(State.COMPLETED);
     final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
     sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
         .forEach(i -> assertTrue(t1.containsInstant(i)));


[hudi] 02/04: [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ae48ecbe232eb55267d1a138baeec13baa1fb249
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Wed Jun 3 00:35:14 2020 -0700

    [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions
---
 .../client/embedded/EmbeddedTimelineService.java    |  4 +++-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java   |  2 ++
 .../apache/hudi/table/HoodieMergeOnReadTable.java   |  2 ++
 .../org/apache/hudi/table/TestMergeOnReadTable.java |  3 +++
 .../table/timeline/HoodieDefaultTimeline.java       |  2 +-
 .../table/view/FileSystemViewStorageConfig.java     | 21 +++++++++++++++++++++
 6 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 5afee3f..c7c4f7b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -89,7 +89,9 @@ public class EmbeddedTimelineService {
    * Retrieves proper view storage configs for remote clients to access this service.
    */
   public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
-    return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
+    FileSystemViewStorageType viewStorageType = config.shouldEnableBackupForRemoteFileSystemView()
+            ? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
+    return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType)
         .withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
   }
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 4c91c77..c74af2d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -359,6 +359,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     if (instant.isCompleted()) {
       LOG.info("Unpublishing instant " + instant);
       instant = activeTimeline.revertToInflight(instant);
+      // reload meta-client to reflect latest timeline status
+      metaClient.reloadActiveTimeline();
     }
 
     // For Requested State (like failure during index lookup), there is nothing to do rollback other than
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 938a5fd..5f56369 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -179,6 +179,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     if (instant.isCompleted()) {
       LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
       instant = this.getActiveTimeline().revertToInflight(instant);
+      // reload meta-client to reflect latest timeline status
+      metaClient.reloadActiveTimeline();
     }
 
     List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index fdc968d..9f3eaea 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.TableFileSystemView.SliceView;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -1219,6 +1220,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 86431c9..6451749 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -107,7 +107,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
 
   @Override
   public HoodieTimeline filterCompletedAndCompactionInstants() {
-    return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()
+    return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
             || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 93c5507..d805dfb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -55,6 +55,15 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
   private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
   private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
 
+  /**
+   * Configs to control whether backup needs to be configured if clients were not able to reach
+   * timeline service.
+   */
+  public static final String REMOTE_BACKUP_VIEW_HANDLER_ENABLE =
+      "hoodie.filesystem.remote.backup.view.enable";
+  // Need to be disabled only for tests.
+  public static final String DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE = "true";
+
   public static FileSystemViewStorageConfig.Builder newBuilder() {
     return new Builder();
   }
@@ -98,6 +107,10 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
     return FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
   }
 
+  public boolean shouldEnableBackupForRemoteFileSystemView() {
+    return Boolean.parseBoolean(props.getProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE));
+  }
+
   public String getRocksdbBasePath() {
     return props.getProperty(ROCKSDB_BASE_PATH_PROP);
   }
@@ -166,6 +179,11 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withEnableBackupForRemoteFileSystemView(boolean enable) {
+      props.setProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE, Boolean.toString(enable));
+      return this;
+    }
+
     public FileSystemViewStorageConfig build() {
       setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_STORAGE_TYPE), FILESYSTEM_VIEW_STORAGE_TYPE,
           DEFAULT_VIEW_STORAGE_TYPE.name());
@@ -188,6 +206,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
           DEFAULT_ROCKSDB_BASE_PATH);
 
+      setDefaultOnCondition(props, !props.containsKey(REMOTE_BACKUP_VIEW_HANDLER_ENABLE),
+          REMOTE_BACKUP_VIEW_HANDLER_ENABLE, DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE);
+
       // Validations
       FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE));
       FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));


[hudi] 01/04: [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6dcd0a3524fe7be0bbbd3e673ed7e1d4b035e0cb
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Tue Jun 2 01:49:37 2020 -0700

    [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  16 +
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 -----
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../apache/hudi/client/TestHoodieClientBase.java   | 938 ++++++++++-----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 -
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  12 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  38 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  12 +-
 pom.xml                                            |   1 +
 19 files changed, 745 insertions(+), 950 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 2e3bc01..708ae29 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -33,4 +33,20 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_HOODIE_PROPERTY = "Property";
   public static final String HEADER_OLD_VALUE = "Old Value";
   public static final String HEADER_NEW_VALUE = "New Value";
+
+  /**
+   * Fields of Stats.
+   */
+  public static final String HEADER_COMMIT_TIME = "CommitTime";
+  public static final String HEADER_TOTAL_UPSERTED = "Total Upserted";
+  public static final String HEADER_TOTAL_WRITTEN = "Total Written";
+  public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write Amplification Factor";
+  public static final String HEADER_HISTOGRAM_MIN = "Min";
+  public static final String HEADER_HISTOGRAM_10TH = "10th";
+  public static final String HEADER_HISTOGRAM_50TH = "50th";
+  public static final String HEADER_HISTOGRAM_AVG = "avg";
+  public static final String HEADER_HISTOGRAM_95TH = "95th";
+  public static final String HEADER_HISTOGRAM_MAX = "Max";
+  public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
+  public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index b05aee2..4874777 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @Component
 public class StatsCommand implements CommandMarker {
 
-  private static final int MAX_FILES = 1000000;
+  public static final int MAX_FILES = 1000000;
 
   @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
       + "records were actually written")
@@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker {
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
 
-  private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
+  public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
     return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), s.getMedian(), s.getMean(), s.get95thPercentile(),
         s.getMax(), s.size(), s.getStdDev()};
   }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
index ad81af5..d9f1688 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -58,4 +58,4 @@ public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarne
   protected static JLineShellComponent getShell() {
     return shell;
   }
-}
\ No newline at end of file
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
deleted file mode 100644
index 9e78ac7..0000000
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.HoodiePrintHelper;
-import org.apache.hudi.cli.HoodieTableHeaderFields;
-import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.TimelineLayoutVersion;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FSUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test class for {@link RepairsCommand}.
- */
-public class TestRepairsCommand extends AbstractShellIntegrationTest {
-
-  private String tablePath;
-
-  @Before
-  public void init() throws IOException {
-    String tableName = "test_table";
-    tablePath = basePath + File.separator + tableName;
-
-    // Create table and connect
-    new TableCommand().createTable(
-        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
-        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
-  }
-
-  /**
-   * Test case for dry run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithDryRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    // default is dry run.
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
-    assertTrue(cr.isSuccess());
-
-    // expected all 'No'.
-    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
-        .stream()
-        .map(partition -> new String[] {partition, "No", "None"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for real run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithRealRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
-    assertTrue(cr.isSuccess());
-
-    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
-    // after dry run, the action will be 'Repaired'
-    String[][] rows = paths.stream()
-        .map(partition -> new String[] {partition, "No", "Repaired"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-
-    cr = getShell().executeCommand("repair addpartitionmeta");
-
-    // after real run, Metadata is present now.
-    rows = paths.stream()
-        .map(partition -> new String[] {partition, "Yes", "None"})
-        .toArray(String[][]::new);
-    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair overwrite-hoodie-props'.
-   */
-  @Test
-  public void testOverwriteHoodieProperties() throws IOException {
-    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
-    assertNotNull("New property file must exist", newProps);
-
-    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
-    assertTrue(cr.isSuccess());
-
-    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
-
-    // after overwrite, the stored value in .hoodie is equals to which read from properties.
-    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
-    Properties expectProps = new Properties();
-    expectProps.load(new FileInputStream(new File(newProps.getPath())));
-
-    Map<String, String> expected = expectProps.entrySet().stream()
-        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
-    assertEquals(expected, result);
-
-    // check result
-    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
-        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
-    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
-        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
-        .toArray(String[][]::new);
-    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
-        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
-
-    assertEquals(expect, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair corrupted clean files'.
-   */
-  @Test
-  public void testRemoveCorruptedPendingCleanAction() throws IOException {
-    HoodieCLI.conf = jsc.hadoopConfiguration();
-
-    Configuration conf = HoodieCLI.conf;
-
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    // Create four requested files
-    for (int i = 100; i < 104; i++) {
-      String timestamp = String.valueOf(i);
-      // Write corrupted requested Compaction
-      HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf);
-    }
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    // first, there are four instants
-    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-
-    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
-    assertTrue(cr.isSuccess());
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-  }
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 37dfe3d..f5f6233 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -120,7 +120,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
   }
 
-  HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
     this(jsc, clientConfig, rollbackPending, index, Option.empty());
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 5f47bf5..6e6458b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,496 +72,477 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-  @Before
-  public void setUp() throws Exception {
-    initResources();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-    return getHoodieWriteClient(cfg, false);
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-                                                   HoodieIndex index) {
-    return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-  }
-
-  protected HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-  }
-
-  /**
-   * Get Default HoodieWriteConfig for tests.
-   *
-   * @return Default Hoodie Write Config for tests
-   */
-  protected HoodieWriteConfig getConfig() {
-    return getConfigBuilder().build();
-  }
-
-  protected HoodieWriteConfig getConfig(IndexType indexType) {
-    return getConfigBuilder(indexType).build();
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  protected HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
-  }
-
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, IndexType.BLOOM);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-        .withWriteStatusClass(MetadataMergeWriteStatus.class)
-        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-        .forTable("test-trip-table")
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-  }
-
-  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    ((SyncableFileSystemView) (table.getSliceView())).reset();
-    return table;
-  }
-
-  /**
-   * Assert no failures in writing hoodie files.
-   *
-   * @param statuses List of Write Status
-   */
-  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-    // Verify there are no errors
-    for (WriteStatus status : statuses) {
-      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+    @Before
+    public void setUp() throws Exception {
+        initResources();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupResources();
+    }
+
+    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+    }
+
+    /**
+     * Get Default HoodieWriteConfig for tests.
+     *
+     * @return Default Hoodie Write Config for tests
+     */
+    protected HoodieWriteConfig getConfig() {
+        return getConfigBuilder().build();
     }
-  }
-
-  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputRecords.stream()
-        .map(HoodieRecord::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputKeys.stream()
-        .map(HoodieKey::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  /**
-   * Ensure presence of partition meta-data at known depth.
-   *
-   * @param partitionPaths Partition paths to check
-   * @param fs             File System
-   * @throws IOException in case of error
-   */
-  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-    for (String partitionPath : partitionPaths) {
-      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-      pmeta.readFromFS();
-      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+
+    protected HoodieWriteConfig getConfig(IndexType indexType) {
+        return getConfigBuilder(indexType).build();
+    }
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    protected HoodieWriteConfig.Builder getConfigBuilder() {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     }
-  }
-
-  /**
-   * Ensure records have location field set.
-   *
-   * @param taggedRecords Tagged Records
-   * @param commitTime    Commit Timestamp
-   */
-  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-    for (HoodieRecord rec : taggedRecords) {
-      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-          rec.getCurrentLocation().getInstantTime(), commitTime);
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
     }
-  }
-
-  /**
-   * Assert that there is no duplicate key at the partition level.
-   *
-   * @param records List of Hoodie records
-   */
-  void assertNodupesWithinPartition(List<HoodieRecord> records) {
-    Map<String, Set<String>> partitionToKeys = new HashMap<>();
-    for (HoodieRecord r : records) {
-      String key = r.getRecordKey();
-      String partitionPath = r.getPartitionPath();
-      if (!partitionToKeys.containsKey(partitionPath)) {
-        partitionToKeys.put(partitionPath, new HashSet<>());
-      }
-      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-      partitionToKeys.get(partitionPath).add(key);
+
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+        return getConfigBuilder(schemaStr, IndexType.BLOOM);
     }
-  }
-
-  /**
-   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by record-generation function itself.
-   *
-   * @param writeConfig       Hoodie Write Config
-   * @param recordGenFunction Records Generation function
-   * @return Wrapped function
-   */
-  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-    return (commit, numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-      return taggedRecords.collect();
-    };
-  }
-
-  /**
-   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by key-generation function itself.
-   *
-   * @param writeConfig    Hoodie Write Config
-   * @param keyGenFunction Keys Generation function
-   * @return Wrapped function
-   */
-  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-    return (numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieKey> records = keyGenFunction.apply(numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-      return taggedRecords.map(record -> record.getKey()).collect();
-    };
-  }
-
-  /**
-   * Generate wrapper for record generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-                                                                                 HoodieWriteConfig writeConfig,
-                                                                                 Function2<List<HoodieRecord>, String, Integer> wrapped) {
-    if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+            .withWriteStatusClass(MetadataMergeWriteStatus.class)
+            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+            .forTable("test-trip-table")
+            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
     }
-  }
-
-  /**
-   * Generate wrapper for delete key generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-                                                                       HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-    if (isPreppedAPI) {
-      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+        ((SyncableFileSystemView) (table.getSliceView())).reset();
+        return table;
     }
-  }
-
-  /**
-   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param writeFn                Write Function to be used for insertion
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                        String initCommitTime, int numRecordsInThisCommit,
-                                        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-  }
-
-  /**
-   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig                  Hoodie Write Config
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param writeFn                      Write Function to be used for upsert
-   * @param isPreppedAPI                 Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits);
-  }
-
-  /**
-   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param deleteFn               Delete Function to be used for deletes
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-    final Function<Integer, List<HoodieKey>> keyGenFunction =
-        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-        keyGenFunction,
-        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-  }
-
-  /**
-   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param recordGenFunction            Records Generation Function
-   * @param writeFn                      Write Function to be used for upsert
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                  Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-                                  Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-                                  Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                  boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-    // Write 1 (only inserts)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(records, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-      if (commitTimesBetweenPrevAndNew.isPresent()) {
-        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-        });
-      }
+
+    /**
+     * Assert no failures in writing hoodie files.
+     *
+     * @param statuses List of Write Status
+     */
+    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+        // Verify there are no errors
+        for (WriteStatus status : statuses) {
+            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+        }
     }
-    return result;
-  }
-
-  /**
-   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-   *
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param keyGenFunction         Key Generation function
-   * @param deleteFn               Write Function to be used for delete
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                   String initCommitTime, int numRecordsInThisCommit,
-                                   Function<Integer, List<HoodieKey>> keyGenFunction,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-    // Delete 1 (only deletes)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForKeys(keysToDelete, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting 3 commits.", 3,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-              + " since it is a delete operation",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+
+    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputRecords.stream()
+            .map(HoodieRecord::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
     }
-    return result;
-  }
 
-  /**
-   * Get Cleaner state corresponding to a partition path.
-   *
-   * @param hoodieCleanStatsTwo List of Clean Stats
-   * @param partitionPath       Partition path for filtering
-   * @return Cleaner state corresponding to partition path
-   */
-  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-  }
+    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputKeys.stream()
+            .map(HoodieKey::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    }
 
-  // Functional Interfaces for passing lambda and Hoodie Write API contexts
+    /**
+     * Ensure presence of partition meta-data at known depth.
+     *
+     * @param partitionPaths Partition paths to check
+     * @param fs File System
+     * @throws IOException in case of error
+     */
+    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+        for (String partitionPath : partitionPaths) {
+            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+            pmeta.readFromFS();
+            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function2<R, T1, T2> {
+    /**
+     * Ensure records have location field set.
+     *
+     * @param taggedRecords Tagged Records
+     * @param commitTime Commit Timestamp
+     */
+    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+        for (HoodieRecord rec : taggedRecords) {
+            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+                rec.getCurrentLocation().getInstantTime(), commitTime);
+        }
+    }
 
-    R apply(T1 v1, T2 v2) throws IOException;
-  }
+    /**
+     * Assert that there is no duplicate key at the partition level.
+     *
+     * @param records List of Hoodie records
+     */
+    void assertNodupesWithinPartition(List<HoodieRecord> records) {
+        Map<String, Set<String>> partitionToKeys = new HashMap<>();
+        for (HoodieRecord r : records) {
+            String key = r.getRecordKey();
+            String partitionPath = r.getPartitionPath();
+            if (!partitionToKeys.containsKey(partitionPath)) {
+                partitionToKeys.put(partitionPath, new HashSet<>());
+            }
+            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+            partitionToKeys.get(partitionPath).add(key);
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function3<R, T1, T2, T3> {
+    /**
+     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param recordGenFunction Records Generation function
+     * @return Wrapped function
+     */
+    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+        return (commit, numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+            return taggedRecords.collect();
+        };
+    }
 
-    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-  }
+    /**
+     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param keyGenFunction Keys Generation function
+     * @return Wrapped function
+     */
+    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+        return (numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieKey> records = keyGenFunction.apply(numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+            return taggedRecords.map(record -> record.getKey()).collect();
+        };
+    }
+
+    /**
+     * Generate wrapper for record generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig,
+        Function2<List<HoodieRecord>, String, Integer> wrapped) {
+        if (isPreppedAPI) {
+            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Generate wrapper for delete key generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+        if (isPreppedAPI) {
+            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for insertion
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+    }
+
+    /**
+     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for upsert
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+            expTotalCommits);
+    }
+
+    /**
+     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param deleteFn Delete Function to be used for deletes
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+        final Function<Integer, List<HoodieKey>> keyGenFunction =
+            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+            keyGenFunction,
+            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+    }
+
+    /**
+     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param recordGenFunction Records Generation Function
+     * @param writeFn Write Function to be used for upsert
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+        // Write 1 (only inserts)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForRecords(records, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+            if (commitTimesBetweenPrevAndNew.isPresent()) {
+                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+                });
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param keyGenFunction Key Generation function
+     * @param deleteFn Write Function to be used for delete
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function<Integer, List<HoodieKey>> keyGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+        // Delete 1 (only deletes)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForKeys(keysToDelete, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting 3 commits.", 3,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+                    + " since it is a delete operation",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+        }
+        return result;
+    }
+
+    /**
+     * Get Cleaner state corresponding to a partition path.
+     *
+     * @param hoodieCleanStatsTwo List of Clean Stats
+     * @param partitionPath Partition path for filtering
+     * @return Cleaner state corresponding to partition path
+     */
+    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+    }
+
+    // Functional Interfaces for passing lambda and Hoodie Write API contexts
+
+    @FunctionalInterface
+    public interface Function2<R, T1, T2> {
+
+        R apply(T1 v1, T2 v2) throws IOException;
+    }
+
+    @FunctionalInterface
+    public interface Function3<R, T1, T2, T3> {
+
+        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+    }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 9b70c10..8d3fa13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness {
     cleanupTestDataGenerator();
   }
 
-  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
-  }
-
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index aad8edf..ab6e940 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.TestRawTripPayload;
+import java.io.IOException;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -58,8 +59,9 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
     cleanupSparkContexts();
+    cleanupFileSystem();
   }
 
   //@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..e4202f0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,11 +17,15 @@
 
 package org.apache.hudi.common;
 
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,225 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-  protected transient JavaSparkContext jsc = null;
-  protected transient SQLContext sqlContext;
-  protected transient FileSystem fs;
-  protected transient HoodieTestDataGenerator dataGen = null;
-  protected transient ExecutorService executorService;
-  protected transient HoodieTableMetaClient metaClient;
-  private static AtomicInteger instantGen = new AtomicInteger(1);
-
-  public String getNextInstant() {
-    return String.format("%09d", instantGen.getAndIncrement());
-  }
-
-  // dfs
-  protected String dfsBasePath;
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
-
-  /**
-   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-   *
-   * @throws IOException
-   */
-  public void initResources() throws IOException {
-    initPath();
-    initSparkContexts();
-    initTestDataGenerator();
-    initFileSystem();
-    initMetaClient();
-  }
-
-  /**
-   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-   * 
-   * @throws IOException
-   */
-  public void cleanupResources() throws IOException {
-    cleanupMetaClient();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupFileSystem();
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-   *
-   * @param appName The specified application name.
-   */
-  protected void initSparkContexts(String appName) {
-    // Initialize a local spark env
-    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-    jsc.setLogLevel("ERROR");
-
-    // SQLContext stuff
-    sqlContext = new SQLContext(jsc);
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-   * <b>TestHoodieClient</b>.
-   */
-  protected void initSparkContexts() {
-    initSparkContexts("TestHoodieClient");
-  }
-
-  /**
-   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-   */
-  protected void cleanupSparkContexts() {
-    if (sqlContext != null) {
-      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-      sqlContext.clearCache();
-      sqlContext = null;
+    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+    protected transient JavaSparkContext jsc = null;
+    protected transient SQLContext sqlContext;
+    protected transient FileSystem fs;
+    protected transient HoodieTestDataGenerator dataGen = null;
+    protected transient ExecutorService executorService;
+    protected transient HoodieTableMetaClient metaClient;
+    private static AtomicInteger instantGen = new AtomicInteger(1);
+    protected transient HoodieWriteClient client;
+
+    public String getNextInstant() {
+        return String.format("%09d", instantGen.getAndIncrement());
+    }
+
+    // dfs
+    protected String dfsBasePath;
+    protected transient HdfsTestService hdfsTestService;
+    protected transient MiniDFSCluster dfsCluster;
+    protected transient DistributedFileSystem dfs;
+
+    /**
+     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void initResources() throws IOException {
+        initPath();
+        initSparkContexts();
+        initTestDataGenerator();
+        initFileSystem();
+        initMetaClient();
+    }
+
+    /**
+     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void cleanupResources() throws IOException {
+        cleanupClients();
+        cleanupSparkContexts();
+        cleanupTestDataGenerator();
+        cleanupFileSystem();
+    }
+
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+     *
+     * @param appName The specified application name.
+     */
+    protected void initSparkContexts(String appName) {
+        // Initialize a local spark env
+        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+        jsc.setLogLevel("ERROR");
+
+        // SQLContext stuff
+        sqlContext = new SQLContext(jsc);
     }
 
-    if (jsc != null) {
-      LOG.info("Closing spark context used in previous test-case");
-      jsc.close();
-      jsc.stop();
-      jsc = null;
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+     * <b>TestHoodieClient</b>.
+     */
+    protected void initSparkContexts() {
+        initSparkContexts("TestHoodieClient");
     }
-  }
-
-  /**
-   * Initializes a file system with the hadoop configuration of Spark context.
-   */
-  protected void initFileSystem() {
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+
+    /**
+     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+     */
+    protected void cleanupSparkContexts() {
+        if (sqlContext != null) {
+            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+            sqlContext.clearCache();
+            sqlContext = null;
+        }
+
+        if (jsc != null) {
+            LOG.info("Closing spark context used in previous test-case");
+            jsc.close();
+            jsc.stop();
+            jsc = null;
+        }
+    }
+
+    /**
+     * Initializes a file system with the hadoop configuration of Spark context.
+     */
+    protected void initFileSystem() {
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
     }
 
-    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-  }
-
-  /**
-   * Initializes file system with a default empty configuration.
-   */
-  protected void initFileSystemWithDefaultConfiguration() {
-    initFileSystemWithConfiguration(new Configuration());
-  }
-
-  /**
-   * Cleanups file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupFileSystem() throws IOException {
-    if (fs != null) {
-      LOG.warn("Closing file-system instance used in previous test-run");
-      fs.close();
+    /**
+     * Initializes file system with a default empty configuration.
+     */
+    protected void initFileSystemWithDefaultConfiguration() {
+        initFileSystemWithConfiguration(new Configuration());
     }
-  }
-
-  /**
-   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by
-   * {@code getTableType()}.
-   *
-   * @throws IOException
-   */
-  protected void initMetaClient() throws IOException {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+
+    /**
+     * Cleanups file system.
+     */
+    protected void cleanupFileSystem() throws IOException {
+        if (fs != null) {
+            LOG.warn("Closing file-system instance used in previous test-run");
+            fs.close();
+        }
     }
 
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+    /**
+     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+     */
+    protected void initMetaClient() throws IOException {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
     }
 
-    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-  }
-
-  /**
-   * Cleanups table type.
-   */
-  protected void cleanupMetaClient() {
-    metaClient = null;
-  }
-
-  /**
-   * Initializes a test data generator which used to generate test datas.
-   *
-   */
-  protected void initTestDataGenerator() {
-    dataGen = new HoodieTestDataGenerator();
-  }
-
-  /**
-   * Cleanups test data generator.
-   *
-   */
-  protected void cleanupTestDataGenerator() {
-    dataGen = null;
-  }
-
-  /**
-   * Initializes a distributed file system and base directory.
-   *
-   * @throws IOException
-   */
-  protected void initDFS() throws IOException {
-    FileSystem.closeAll();
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-
-    // Create a temp folder as the base path
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    dfs.mkdirs(new Path(dfsBasePath));
-  }
-
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown();
+    /**
+     * Cleanups table type.
+     */
+    protected void cleanupClients() {
+        metaClient = null;
+        if (null != client) {
+            client.close();
+            client = null;
+        }
     }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
-  /**
-   * Initializes executor service with a fixed thread pool.
-   *
-   * @param threadNum specify the capacity of the fixed thread pool
-   */
-  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-    executorService = Executors.newFixedThreadPool(threadNum);
-  }
-
-  /**
-   * Cleanups the executor service.
-   */
-  protected void cleanupExecutorService() {
-    if (this.executorService != null) {
-      this.executorService.shutdownNow();
-      this.executorService = null;
+
+    /**
+     * Initializes a test data generator which used to generate test datas.
+     */
+    protected void initTestDataGenerator() {
+        dataGen = new HoodieTestDataGenerator();
     }
-  }
 
-  private void initFileSystemWithConfiguration(Configuration configuration) {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+    /**
+     * Cleanups test data generator.
+     */
+    protected void cleanupTestDataGenerator() {
+        dataGen = null;
     }
 
-    fs = FSUtils.getFs(basePath, configuration);
-    if (fs instanceof LocalFileSystem) {
-      LocalFileSystem lfs = (LocalFileSystem) fs;
-      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-      // So, for the tests, we enforce checksum verification to circumvent the problem
-      lfs.setVerifyChecksum(true);
+    /**
+     * Initializes a distributed file system and base directory.
+     */
+    protected void initDFS() throws IOException {
+        FileSystem.closeAll();
+        hdfsTestService = new HdfsTestService();
+        dfsCluster = hdfsTestService.start(true);
+
+        // Create a temp folder as the base path
+        dfs = dfsCluster.getFileSystem();
+        dfsBasePath = dfs.getWorkingDirectory().toString();
+        dfs.mkdirs(new Path(dfsBasePath));
     }
-  }
 
+    /**
+     * Cleanups the distributed file system.
+     */
+    protected void cleanupDFS() throws IOException {
+        if (hdfsTestService != null) {
+            hdfsTestService.stop();
+            dfsCluster.shutdown();
+            hdfsTestService = null;
+            dfsCluster = null;
+            dfs = null;
+        }
+        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+        // same JVM
+        FileSystem.closeAll();
+    }
+
+    /**
+     * Initializes executor service with a fixed thread pool.
+     *
+     * @param threadNum specify the capacity of the fixed thread pool
+     */
+    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+        executorService = Executors.newFixedThreadPool(threadNum);
+    }
+
+    /**
+     * Cleanups the executor service.
+     */
+    protected void cleanupExecutorService() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+            this.executorService = null;
+        }
+    }
+
+    private void initFileSystemWithConfiguration(Configuration configuration) {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        fs = FSUtils.getFs(basePath, configuration);
+        if (fs instanceof LocalFileSystem) {
+            LocalFileSystem lfs = (LocalFileSystem) fs;
+            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+            // So, for the tests, we enforce checksum verification to circumvent the problem
+            lfs.setVerifyChecksum(true);
+        }
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+        return getHoodieWriteClient(cfg, false);
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+    }
+
+    public HoodieReadClient getHoodieReadClient(String basePath) {
+        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+        HoodieIndex index) {
+        if (null != client) {
+            client.close();
+            client = null;
+        }
+        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+        return client;
+    }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 05638e2..6ddb578 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -60,7 +60,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     if (utility != null) {
       utility.shutdownMiniCluster();
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..43f2fd1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -86,6 +86,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   @AfterClass
   public static void clean() throws Exception {
     if (utility != null) {
+      utility.deleteTable(tableName);
       utility.shutdownMiniCluster();
     }
   }
@@ -115,11 +116,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
@@ -132,7 +129,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
@@ -172,7 +169,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(newCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -206,7 +203,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -256,7 +253,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // only for test, set the hbaseConnection to mocked object
     index.setHbaseConnection(hbaseConnection);
 
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
@@ -281,7 +278,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void testTotalPutsBatching() throws Exception {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 91435f8..b97fefc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -45,7 +45,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index d29cfa4..105b0e8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -107,7 +107,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupFileSystem();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index ddf2775..55d4526 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -80,7 +80,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 664f4b5..7fd02bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -68,11 +68,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
+    cleanupFileSystem();
   }
 
   @Test
@@ -83,9 +80,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
 
     // Build a write config with bulkinsertparallelism set
     HoodieWriteConfig cfg = getConfigBuilder().build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
-
       /**
        * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
        * each. id1 (21 records), id2 (21 records), id3, id4
@@ -224,7 +220,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
   public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder().build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index ec64080..6887531 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestUtils;
@@ -85,7 +86,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     cleanupFileSystem();
     cleanupTestDataGenerator();
   }
@@ -129,6 +130,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 740caf2..fdc968d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -96,16 +96,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     cleanupDFS();
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
   public void testSimpleInsertAndUpdate() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -190,7 +187,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       String newCommitTime = "001";
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -213,7 +210,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -298,7 +295,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -351,7 +348,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
       /**
@@ -394,7 +391,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       final String commitTime1 = "002";
       // WriteClient with custom config (disable small file handling)
-      try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
+      try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
         secondClient.startCommitWithTime(commitTime1);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -424,7 +421,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        * Write 3 (inserts + updates - testing successful delta commit)
        */
       final String commitTime2 = "002";
-      try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
+      try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
         thirdClient.startCommitWithTime(commitTime2);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -500,7 +497,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (final HoodieWriteClient client = getWriteClient(cfg);) {
+    try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       /**
        * Write 1 (only inserts)
        */
@@ -541,7 +538,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       newCommitTime = "002";
       // WriteClient with custom config (disable small file handling)
-      HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
+      HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
       nClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -664,7 +661,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -743,7 +740,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -816,7 +813,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -853,7 +850,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -927,7 +924,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -979,10 +976,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsInMetadata() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
@@ -1080,7 +1076,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsWithSmallFileHandling() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 8fa55ec..09d62a7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   private HoodieWriteConfig getConfig() {
@@ -114,8 +111,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -132,8 +128,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-      String newCommitTime = "100";
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+     String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/pom.xml b/pom.xml
index f1990e1..6370c37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <skip>${skipUTs}</skip>
+          <argLine>-Xmx4g</argLine>
           <systemPropertyVariables>
             <log4j.configuration>
               ${surefire-log4j.file}


[hudi] 04/04: Bumping release candidate number 2

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 41fb6c268c6e84949ee70f305df93680c431a9da
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jun 8 08:41:25 2020 -0400

    Bumping release candidate number 2
---
 docker/hoodie/hadoop/base/pom.xml             | 2 +-
 docker/hoodie/hadoop/datanode/pom.xml         | 2 +-
 docker/hoodie/hadoop/historyserver/pom.xml    | 2 +-
 docker/hoodie/hadoop/hive_base/pom.xml        | 2 +-
 docker/hoodie/hadoop/namenode/pom.xml         | 2 +-
 docker/hoodie/hadoop/pom.xml                  | 2 +-
 docker/hoodie/hadoop/prestobase/pom.xml       | 2 +-
 docker/hoodie/hadoop/spark_base/pom.xml       | 2 +-
 docker/hoodie/hadoop/sparkadhoc/pom.xml       | 2 +-
 docker/hoodie/hadoop/sparkmaster/pom.xml      | 2 +-
 docker/hoodie/hadoop/sparkworker/pom.xml      | 2 +-
 hudi-cli/pom.xml                              | 2 +-
 hudi-client/pom.xml                           | 2 +-
 hudi-common/pom.xml                           | 2 +-
 hudi-hadoop-mr/pom.xml                        | 2 +-
 hudi-hive/pom.xml                             | 2 +-
 hudi-integ-test/pom.xml                       | 2 +-
 hudi-spark/pom.xml                            | 2 +-
 hudi-timeline-service/pom.xml                 | 2 +-
 hudi-utilities/pom.xml                        | 2 +-
 packaging/hudi-hadoop-mr-bundle/pom.xml       | 2 +-
 packaging/hudi-hive-bundle/pom.xml            | 2 +-
 packaging/hudi-presto-bundle/pom.xml          | 2 +-
 packaging/hudi-spark-bundle/pom.xml           | 2 +-
 packaging/hudi-timeline-server-bundle/pom.xml | 2 +-
 packaging/hudi-utilities-bundle/pom.xml       | 2 +-
 pom.xml                                       | 2 +-
 27 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/docker/hoodie/hadoop/base/pom.xml b/docker/hoodie/hadoop/base/pom.xml
index 2f271c7..c95f610 100644
--- a/docker/hoodie/hadoop/base/pom.xml
+++ b/docker/hoodie/hadoop/base/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/datanode/pom.xml b/docker/hoodie/hadoop/datanode/pom.xml
index c27fa29..3715b89 100644
--- a/docker/hoodie/hadoop/datanode/pom.xml
+++ b/docker/hoodie/hadoop/datanode/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/historyserver/pom.xml b/docker/hoodie/hadoop/historyserver/pom.xml
index 302d453..63defe3 100644
--- a/docker/hoodie/hadoop/historyserver/pom.xml
+++ b/docker/hoodie/hadoop/historyserver/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/hive_base/pom.xml b/docker/hoodie/hadoop/hive_base/pom.xml
index 087c60c..494f3c0 100644
--- a/docker/hoodie/hadoop/hive_base/pom.xml
+++ b/docker/hoodie/hadoop/hive_base/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/namenode/pom.xml b/docker/hoodie/hadoop/namenode/pom.xml
index 5e93c41..e68ddeb 100644
--- a/docker/hoodie/hadoop/namenode/pom.xml
+++ b/docker/hoodie/hadoop/namenode/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index 4c77771..23419c5 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/docker/hoodie/hadoop/prestobase/pom.xml b/docker/hoodie/hadoop/prestobase/pom.xml
index c8ac0c9..e8b6e1c 100644
--- a/docker/hoodie/hadoop/prestobase/pom.xml
+++ b/docker/hoodie/hadoop/prestobase/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/spark_base/pom.xml b/docker/hoodie/hadoop/spark_base/pom.xml
index ae80714..4aa0e86 100644
--- a/docker/hoodie/hadoop/spark_base/pom.xml
+++ b/docker/hoodie/hadoop/spark_base/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/sparkadhoc/pom.xml b/docker/hoodie/hadoop/sparkadhoc/pom.xml
index 0ad98af..08a9cb7 100644
--- a/docker/hoodie/hadoop/sparkadhoc/pom.xml
+++ b/docker/hoodie/hadoop/sparkadhoc/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/sparkmaster/pom.xml b/docker/hoodie/hadoop/sparkmaster/pom.xml
index 1df5edd..e8306dd 100644
--- a/docker/hoodie/hadoop/sparkmaster/pom.xml
+++ b/docker/hoodie/hadoop/sparkmaster/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/docker/hoodie/hadoop/sparkworker/pom.xml b/docker/hoodie/hadoop/sparkworker/pom.xml
index e80a5c9..e5d231d 100644
--- a/docker/hoodie/hadoop/sparkworker/pom.xml
+++ b/docker/hoodie/hadoop/sparkworker/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi-hadoop-docker</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <packaging>pom</packaging>
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 7b63e23..d9dd61c 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 7b665de..b8edb7f 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index ed7c12f..5fde1ff 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index b229370..a472146 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-hive/pom.xml b/hudi-hive/pom.xml
index 5ad8708..0f69358 100644
--- a/hudi-hive/pom.xml
+++ b/hudi-hive/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index d134288..b126e8f 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <artifactId>hudi-integ-test</artifactId>
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index 6b90a69..064bbe9 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-timeline-service/pom.xml b/hudi-timeline-service/pom.xml
index c51ec54..7d82910 100644
--- a/hudi-timeline-service/pom.xml
+++ b/hudi-timeline-service/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 414d1da..7e9d4a3 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index ae5a99f..d3b9263 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/packaging/hudi-hive-bundle/pom.xml b/packaging/hudi-hive-bundle/pom.xml
index 7979b63..7998fa9 100644
--- a/packaging/hudi-hive-bundle/pom.xml
+++ b/packaging/hudi-hive-bundle/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml
index 4900e39..e5b6058 100644
--- a/packaging/hudi-presto-bundle/pom.xml
+++ b/packaging/hudi-presto-bundle/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index 5cbf2b7..d230ad2 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/packaging/hudi-timeline-server-bundle/pom.xml b/packaging/hudi-timeline-server-bundle/pom.xml
index f7f9249..928d286 100644
--- a/packaging/hudi-timeline-server-bundle/pom.xml
+++ b/packaging/hudi-timeline-server-bundle/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>hudi</artifactId>
         <groupId>org.apache.hudi</groupId>
-        <version>0.5.3-SNAPSHOT</version>
+        <version>0.5.3-rc2</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index a84ce2a..cb02c4b 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <artifactId>hudi</artifactId>
     <groupId>org.apache.hudi</groupId>
-    <version>0.5.3-SNAPSHOT</version>
+    <version>0.5.3-rc2</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/pom.xml b/pom.xml
index 6370c37..e24e83d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi</artifactId>
   <packaging>pom</packaging>
-  <version>0.5.3-SNAPSHOT</version>
+  <version>0.5.3-rc2</version>
   <description>Apache Hudi brings stream style processing on big data</description>
   <url>https://github.com/apache/hudi</url>
   <name>Hudi</name>