You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/06/08 22:48:11 UTC

[hudi] branch release-0.5.3 updated (5fcc461 -> 864a7cd)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git.


    omit 5fcc461  Bumping release candidate number 1
     new 6dcd0a3  [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
     new ae48ecb  [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions
     new 84ca5b0  Making few fixes after cherry picking
     new 864a7cd  [HUDI-988] Fix More Unit Test Flakiness

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5fcc461)
            \
             N -- N -- N   refs/heads/release-0.5.3 (864a7cd)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/hoodie/hadoop/base/pom.xml                  |   2 +-
 docker/hoodie/hadoop/datanode/pom.xml              |   2 +-
 docker/hoodie/hadoop/historyserver/pom.xml         |   2 +-
 docker/hoodie/hadoop/hive_base/pom.xml             |   2 +-
 docker/hoodie/hadoop/namenode/pom.xml              |   2 +-
 docker/hoodie/hadoop/pom.xml                       |   2 +-
 docker/hoodie/hadoop/prestobase/pom.xml            |   2 +-
 docker/hoodie/hadoop/spark_base/pom.xml            |   2 +-
 docker/hoodie/hadoop/sparkadhoc/pom.xml            |   2 +-
 docker/hoodie/hadoop/sparkmaster/pom.xml           |   2 +-
 docker/hoodie/hadoop/sparkworker/pom.xml           |   2 +-
 hudi-cli/pom.xml                                   |   2 +-
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  36 ----
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 ---------------------
 hudi-client/pom.xml                                |   2 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../client/embedded/EmbeddedTimelineService.java   |   4 +-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |   2 +
 .../apache/hudi/table/HoodieMergeOnReadTable.java  |   2 +
 .../hudi/client/TestCompactionAdminClient.java     |  35 ++--
 .../apache/hudi/client/TestHoodieClientBase.java   | 187 +++++++++----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   8 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   5 +-
 .../hudi/common/HoodieClientTestHarness.java       | 101 +++++++---
 .../execution/TestBoundedInMemoryExecutor.java     |   2 +-
 .../hudi/execution/TestBoundedInMemoryQueue.java   |   3 +-
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |  29 ++-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   4 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   5 +-
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java |   3 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  14 +-
 .../apache/hudi/table/TestConsistencyGuard.java    |   2 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   6 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    | 145 +++++++--------
 .../hudi/table/compact/TestAsyncCompaction.java    |   2 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  17 +-
 hudi-common/pom.xml                                |   2 +-
 .../table/timeline/HoodieDefaultTimeline.java      |   2 +-
 .../table/view/FileSystemViewStorageConfig.java    |  21 +++
 .../table/view/HoodieTableFileSystemView.java      |   6 +
 .../table/string/TestHoodieActiveTimeline.java     |   2 +-
 hudi-hadoop-mr/pom.xml                             |   2 +-
 hudi-hive/pom.xml                                  |   2 +-
 hudi-integ-test/pom.xml                            |   2 +-
 hudi-spark/pom.xml                                 |   2 +-
 hudi-timeline-service/pom.xml                      |   2 +-
 .../timeline/service/FileSystemViewHandler.java    |   2 +-
 hudi-utilities/pom.xml                             |   2 +-
 packaging/hudi-hadoop-mr-bundle/pom.xml            |   2 +-
 packaging/hudi-hive-bundle/pom.xml                 |   2 +-
 packaging/hudi-presto-bundle/pom.xml               |   2 +-
 packaging/hudi-spark-bundle/pom.xml                |   2 +-
 packaging/hudi-timeline-server-bundle/pom.xml      |   2 +-
 packaging/hudi-utilities-bundle/pom.xml            |   2 +-
 pom.xml                                            |   3 +-
 58 files changed, 369 insertions(+), 560 deletions(-)
 delete mode 100644 hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
 delete mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java


[hudi] 03/04: Making few fixes after cherry picking

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 84ca5b0cae72d9b33271045efee93a4cf1a0cff5
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun Jun 7 16:23:40 2020 -0400

    Making few fixes after cherry picking
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  52 --
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../apache/hudi/client/TestHoodieClientBase.java   | 917 +++++++++++----------
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   3 -
 .../apache/hudi/table/TestMergeOnReadTable.java    |   2 +
 .../hudi/table/compact/TestHoodieCompactor.java    |   6 +-
 .../table/string/TestHoodieActiveTimeline.java     |   2 +-
 8 files changed, 680 insertions(+), 732 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
deleted file mode 100644
index 708ae29..0000000
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli;
-
-/**
- * Fields of print table header.
- */
-public class HoodieTableHeaderFields {
-
-  public static final String HEADER_PARTITION = "Partition";
-  public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
-  /**
-   * Fields of Repair.
-   */
-  public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
-  public static final String HEADER_REPAIR_ACTION = "Action";
-  public static final String HEADER_HOODIE_PROPERTY = "Property";
-  public static final String HEADER_OLD_VALUE = "Old Value";
-  public static final String HEADER_NEW_VALUE = "New Value";
-
-  /**
-   * Fields of Stats.
-   */
-  public static final String HEADER_COMMIT_TIME = "CommitTime";
-  public static final String HEADER_TOTAL_UPSERTED = "Total Upserted";
-  public static final String HEADER_TOTAL_WRITTEN = "Total Written";
-  public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write Amplification Factor";
-  public static final String HEADER_HISTOGRAM_MIN = "Min";
-  public static final String HEADER_HISTOGRAM_10TH = "10th";
-  public static final String HEADER_HISTOGRAM_50TH = "50th";
-  public static final String HEADER_HISTOGRAM_AVG = "avg";
-  public static final String HEADER_HISTOGRAM_95TH = "95th";
-  public static final String HEADER_HISTOGRAM_MAX = "Max";
-  public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
-  public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
-}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index 4874777..b05aee2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @Component
 public class StatsCommand implements CommandMarker {
 
-  public static final int MAX_FILES = 1000000;
+  private static final int MAX_FILES = 1000000;
 
   @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
       + "records were actually written")
@@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker {
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
 
-  public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
+  private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
     return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), s.getMedian(), s.getMean(), s.get95thPercentile(),
         s.getMax(), s.size(), s.getStdDev()};
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 6e6458b..6856489 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -72,477 +72,478 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-    @Before
-    public void setUp() throws Exception {
-        initResources();
+  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+  @Before
+  public void setUp() throws Exception {
+    initResources();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Get Default HoodieWriteConfig for tests.
+   *
+   * @return Default Hoodie Write Config for tests
+   */
+  protected HoodieWriteConfig getConfig() {
+    return getConfigBuilder().build();
+  }
+
+  protected HoodieWriteConfig getConfig(IndexType indexType) {
+    return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  protected HoodieWriteConfig.Builder getConfigBuilder() {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return getConfigBuilder(schemaStr, IndexType.BLOOM);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    ((SyncableFileSystemView) (table.getSliceView())).reset();
+    return table;
+  }
+
+  /**
+   * Assert no failures in writing hoodie files.
+   *
+   * @param statuses List of Write Status
+   */
+  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+    // Verify there are no errors
+    for (WriteStatus status : statuses) {
+      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
     }
-
-    @After
-    public void tearDown() throws Exception {
-        cleanupResources();
+  }
+
+  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputRecords.stream()
+        .map(HoodieRecord::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputKeys.stream()
+        .map(HoodieKey::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  /**
+   * Ensure presence of partition meta-data at known depth.
+   *
+   * @param partitionPaths Partition paths to check
+   * @param fs File System
+   * @throws IOException in case of error
+   */
+  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+    for (String partitionPath : partitionPaths) {
+      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+      pmeta.readFromFS();
+      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
     }
-
-    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Ensure records have location field set.
+   *
+   * @param taggedRecords Tagged Records
+   * @param commitTime Commit Timestamp
+   */
+  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+    for (HoodieRecord rec : taggedRecords) {
+      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+          rec.getCurrentLocation().getInstantTime(), commitTime);
     }
-
-    /**
-     * Get Default HoodieWriteConfig for tests.
-     *
-     * @return Default Hoodie Write Config for tests
-     */
-    protected HoodieWriteConfig getConfig() {
-        return getConfigBuilder().build();
+  }
+
+  /**
+   * Assert that there is no duplicate key at the partition level.
+   *
+   * @param records List of Hoodie records
+   */
+  void assertNodupesWithinPartition(List<HoodieRecord> records) {
+    Map<String, Set<String>> partitionToKeys = new HashMap<>();
+    for (HoodieRecord r : records) {
+      String key = r.getRecordKey();
+      String partitionPath = r.getPartitionPath();
+      if (!partitionToKeys.containsKey(partitionPath)) {
+        partitionToKeys.put(partitionPath, new HashSet<>());
+      }
+      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+      partitionToKeys.get(partitionPath).add(key);
     }
-
-    protected HoodieWriteConfig getConfig(IndexType indexType) {
-        return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param recordGenFunction Records Generation function
+   * @return Wrapped function
+   */
+  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+    return (commit, numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+      return taggedRecords.collect();
+    };
+  }
+
+  /**
+   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param keyGenFunction Keys Generation function
+   * @return Wrapped function
+   */
+  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+    return (numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieKey> records = keyGenFunction.apply(numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+      return taggedRecords.map(record -> record.getKey()).collect();
+    };
+  }
+
+  /**
+   * Generate wrapper for record generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig,
+      Function2<List<HoodieRecord>, String, Integer> wrapped) {
+    if (isPreppedAPI) {
+      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    protected HoodieWriteConfig.Builder getConfigBuilder() {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Generate wrapper for delete key generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+    if (isPreppedAPI) {
+      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  /**
+   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for insertion
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+  }
+
+  /**
+   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for upsert
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+        expTotalCommits);
+  }
+
+  /**
+   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param deleteFn Delete Function to be used for deletes
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+    final Function<Integer, List<HoodieKey>> keyGenFunction =
+        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+        keyGenFunction,
+        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+  }
+
+  /**
+   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param recordGenFunction Records Generation Function
+   * @param writeFn Write Function to be used for upsert
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+      Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+    // Write 1 (only inserts)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForRecords(records, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+      if (commitTimesBetweenPrevAndNew.isPresent()) {
+        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+        });
+      }
     }
-
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-        return getConfigBuilder(schemaStr, IndexType.BLOOM);
+    return result;
+  }
+
+  /**
+   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param keyGenFunction Key Generation function
+   * @param deleteFn Write Function to be used for delete
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function<Integer, List<HoodieKey>> keyGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+    // Delete 1 (only deletes)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForKeys(keysToDelete, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting 3 commits.", 3,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+              + " since it is a delete operation",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
     }
+    return result;
+  }
 
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-            .withWriteStatusClass(MetadataMergeWriteStatus.class)
-            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-            .forTable("test-trip-table")
-            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-    }
+  /**
+   * Get Cleaner state corresponding to a partition path.
+   *
+   * @param hoodieCleanStatsTwo List of Clean Stats
+   * @param partitionPath Partition path for filtering
+   * @return Cleaner state corresponding to partition path
+   */
+  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+  }
 
-    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        ((SyncableFileSystemView) (table.getSliceView())).reset();
-        return table;
-    }
+  // Functional Interfaces for passing lambda and Hoodie Write API contexts
 
-    /**
-     * Assert no failures in writing hoodie files.
-     *
-     * @param statuses List of Write Status
-     */
-    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-        // Verify there are no errors
-        for (WriteStatus status : statuses) {
-            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
-        }
-    }
+  @FunctionalInterface
+  public interface Function2<R, T1, T2> {
 
-    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputRecords.stream()
-            .map(HoodieRecord::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
+    R apply(T1 v1, T2 v2) throws IOException;
+  }
 
-    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputKeys.stream()
-            .map(HoodieKey::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
-
-    /**
-     * Ensure presence of partition meta-data at known depth.
-     *
-     * @param partitionPaths Partition paths to check
-     * @param fs File System
-     * @throws IOException in case of error
-     */
-    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-        for (String partitionPath : partitionPaths) {
-            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-            pmeta.readFromFS();
-            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
-        }
-    }
-
-    /**
-     * Ensure records have location field set.
-     *
-     * @param taggedRecords Tagged Records
-     * @param commitTime Commit Timestamp
-     */
-    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-        for (HoodieRecord rec : taggedRecords) {
-            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-                rec.getCurrentLocation().getInstantTime(), commitTime);
-        }
-    }
-
-    /**
-     * Assert that there is no duplicate key at the partition level.
-     *
-     * @param records List of Hoodie records
-     */
-    void assertNodupesWithinPartition(List<HoodieRecord> records) {
-        Map<String, Set<String>> partitionToKeys = new HashMap<>();
-        for (HoodieRecord r : records) {
-            String key = r.getRecordKey();
-            String partitionPath = r.getPartitionPath();
-            if (!partitionToKeys.containsKey(partitionPath)) {
-                partitionToKeys.put(partitionPath, new HashSet<>());
-            }
-            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-            partitionToKeys.get(partitionPath).add(key);
-        }
-    }
-
-    /**
-     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param recordGenFunction Records Generation function
-     * @return Wrapped function
-     */
-    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-        return (commit, numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-            return taggedRecords.collect();
-        };
-    }
-
-    /**
-     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param keyGenFunction Keys Generation function
-     * @return Wrapped function
-     */
-    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-        return (numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieKey> records = keyGenFunction.apply(numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-            return taggedRecords.map(record -> record.getKey()).collect();
-        };
-    }
+  @FunctionalInterface
+  public interface Function3<R, T1, T2, T3> {
 
-    /**
-     * Generate wrapper for record generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig,
-        Function2<List<HoodieRecord>, String, Integer> wrapped) {
-        if (isPreppedAPI) {
-            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Generate wrapper for delete key generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-        if (isPreppedAPI) {
-            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for insertion
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-    }
-
-    /**
-     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for upsert
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-            expTotalCommits);
-    }
-
-    /**
-     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param deleteFn Delete Function to be used for deletes
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-        final Function<Integer, List<HoodieKey>> keyGenFunction =
-            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-            keyGenFunction,
-            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-    }
-
-    /**
-     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param recordGenFunction Records Generation Function
-     * @param writeFn Write Function to be used for upsert
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-        // Write 1 (only inserts)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForRecords(records, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-            if (commitTimesBetweenPrevAndNew.isPresent()) {
-                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-                });
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param keyGenFunction Key Generation function
-     * @param deleteFn Write Function to be used for delete
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function<Integer, List<HoodieKey>> keyGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-        // Delete 1 (only deletes)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForKeys(keysToDelete, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting 3 commits.", 3,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-                    + " since it is a delete operation",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-        }
-        return result;
-    }
-
-    /**
-     * Get Cleaner state corresponding to a partition path.
-     *
-     * @param hoodieCleanStatsTwo List of Clean Stats
-     * @param partitionPath Partition path for filtering
-     * @return Cleaner state corresponding to partition path
-     */
-    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-    }
-
-    // Functional Interfaces for passing lambda and Hoodie Write API contexts
-
-    @FunctionalInterface
-    public interface Function2<R, T1, T2> {
-
-        R apply(T1 v1, T2 v2) throws IOException;
-    }
-
-    @FunctionalInterface
-    public interface Function3<R, T1, T2, T3> {
-
-        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-    }
+    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+  }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index e4202f0..4c7b890 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -49,239 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-    protected transient JavaSparkContext jsc = null;
-    protected transient SQLContext sqlContext;
-    protected transient FileSystem fs;
-    protected transient HoodieTestDataGenerator dataGen = null;
-    protected transient ExecutorService executorService;
-    protected transient HoodieTableMetaClient metaClient;
-    private static AtomicInteger instantGen = new AtomicInteger(1);
-    protected transient HoodieWriteClient client;
-
-    public String getNextInstant() {
-        return String.format("%09d", instantGen.getAndIncrement());
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+  protected transient JavaSparkContext jsc = null;
+  protected transient SQLContext sqlContext;
+  protected transient FileSystem fs;
+  protected transient HoodieTestDataGenerator dataGen = null;
+  protected transient ExecutorService executorService;
+  protected transient HoodieTableMetaClient metaClient;
+  private static AtomicInteger instantGen = new AtomicInteger(1);
+  protected transient HoodieWriteClient client;
+
+  public String getNextInstant() {
+    return String.format("%09d", instantGen.getAndIncrement());
+  }
+
+  // dfs
+  protected String dfsBasePath;
+  protected transient HdfsTestService hdfsTestService;
+  protected transient MiniDFSCluster dfsCluster;
+  protected transient DistributedFileSystem dfs;
+
+  /**
+   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void initResources() throws IOException {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initFileSystem();
+    initMetaClient();
+  }
+
+  /**
+   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void cleanupResources() throws IOException {
+    cleanupClients();
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    cleanupFileSystem();
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+   *
+   * @param appName The specified application name.
+   */
+  protected void initSparkContexts(String appName) {
+    // Initialize a local spark env
+    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+    jsc.setLogLevel("ERROR");
+
+    // SQLContext stuff
+    sqlContext = new SQLContext(jsc);
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+   * <b>TestHoodieClient</b>.
+   */
+  protected void initSparkContexts() {
+    initSparkContexts("TestHoodieClient");
+  }
+
+  /**
+   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+   */
+  protected void cleanupSparkContexts() {
+    if (sqlContext != null) {
+      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+      sqlContext.clearCache();
+      sqlContext = null;
     }
 
-    // dfs
-    protected String dfsBasePath;
-    protected transient HdfsTestService hdfsTestService;
-    protected transient MiniDFSCluster dfsCluster;
-    protected transient DistributedFileSystem dfs;
-
-    /**
-     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void initResources() throws IOException {
-        initPath();
-        initSparkContexts();
-        initTestDataGenerator();
-        initFileSystem();
-        initMetaClient();
+    if (jsc != null) {
+      LOG.info("Closing spark context used in previous test-case");
+      jsc.close();
+      jsc.stop();
+      jsc = null;
     }
-
-    /**
-     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void cleanupResources() throws IOException {
-        cleanupClients();
-        cleanupSparkContexts();
-        cleanupTestDataGenerator();
-        cleanupFileSystem();
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-     *
-     * @param appName The specified application name.
-     */
-    protected void initSparkContexts(String appName) {
-        // Initialize a local spark env
-        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-        jsc.setLogLevel("ERROR");
-
-        // SQLContext stuff
-        sqlContext = new SQLContext(jsc);
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-     * <b>TestHoodieClient</b>.
-     */
-    protected void initSparkContexts() {
-        initSparkContexts("TestHoodieClient");
-    }
-
-    /**
-     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-     */
-    protected void cleanupSparkContexts() {
-        if (sqlContext != null) {
-            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-            sqlContext.clearCache();
-            sqlContext = null;
-        }
-
-        if (jsc != null) {
-            LOG.info("Closing spark context used in previous test-case");
-            jsc.close();
-            jsc.stop();
-            jsc = null;
-        }
-    }
-
-    /**
-     * Initializes a file system with the hadoop configuration of Spark context.
-     */
-    protected void initFileSystem() {
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-    }
-
-    /**
-     * Initializes file system with a default empty configuration.
-     */
-    protected void initFileSystemWithDefaultConfiguration() {
-        initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Initializes a file system with the hadoop configuration of Spark context.
+   */
+  protected void initFileSystem() {
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Cleanups file system.
-     */
-    protected void cleanupFileSystem() throws IOException {
-        if (fs != null) {
-            LOG.warn("Closing file-system instance used in previous test-run");
-            fs.close();
-        }
+    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
+  }
+
+  /**
+   * Initializes file system with a default empty configuration.
+   */
+  protected void initFileSystemWithDefaultConfiguration() {
+    initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Cleanups file system.
+   */
+  protected void cleanupFileSystem() throws IOException {
+    if (fs != null) {
+      LOG.warn("Closing file-system instance used in previous test-run");
+      fs.close();
     }
-
-    /**
-     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
-     */
-    protected void initMetaClient() throws IOException {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-    }
-
-    /**
-     * Cleanups table type.
-     */
-    protected void cleanupClients() {
-        metaClient = null;
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-    }
-
-    /**
-     * Initializes a test data generator which used to generate test datas.
-     */
-    protected void initTestDataGenerator() {
-        dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+   */
+  protected void initMetaClient() throws IOException {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    /**
-     * Cleanups test data generator.
-     */
-    protected void cleanupTestDataGenerator() {
-        dataGen = null;
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Initializes a distributed file system and base directory.
-     */
-    protected void initDFS() throws IOException {
-        FileSystem.closeAll();
-        hdfsTestService = new HdfsTestService();
-        dfsCluster = hdfsTestService.start(true);
-
-        // Create a temp folder as the base path
-        dfs = dfsCluster.getFileSystem();
-        dfsBasePath = dfs.getWorkingDirectory().toString();
-        dfs.mkdirs(new Path(dfsBasePath));
+    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+  }
+
+  /**
+   * Cleanups table type.
+   */
+  protected void cleanupClients() {
+    metaClient = null;
+    if (null != client) {
+      client.close();
+      client = null;
     }
-
-    /**
-     * Cleanups the distributed file system.
-     */
-    protected void cleanupDFS() throws IOException {
-        if (hdfsTestService != null) {
-            hdfsTestService.stop();
-            dfsCluster.shutdown();
-            hdfsTestService = null;
-            dfsCluster = null;
-            dfs = null;
-        }
-        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-        // same JVM
-        FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes a test data generator which used to generate test datas.
+   */
+  protected void initTestDataGenerator() {
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Cleanups test data generator.
+   */
+  protected void cleanupTestDataGenerator() {
+    dataGen = null;
+  }
+
+  /**
+   * Initializes a distributed file system and base directory.
+   */
+  protected void initDFS() throws IOException {
+    FileSystem.closeAll();
+    hdfsTestService = new HdfsTestService();
+    dfsCluster = hdfsTestService.start(true);
+
+    // Create a temp folder as the base path
+    dfs = dfsCluster.getFileSystem();
+    dfsBasePath = dfs.getWorkingDirectory().toString();
+    dfs.mkdirs(new Path(dfsBasePath));
+  }
+
+  /**
+   * Cleanups the distributed file system.
+   */
+  protected void cleanupDFS() throws IOException {
+    if (hdfsTestService != null) {
+      hdfsTestService.stop();
+      dfsCluster.shutdown();
+      hdfsTestService = null;
+      dfsCluster = null;
+      dfs = null;
     }
-
-    /**
-     * Initializes executor service with a fixed thread pool.
-     *
-     * @param threadNum specify the capacity of the fixed thread pool
-     */
-    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-        executorService = Executors.newFixedThreadPool(threadNum);
+    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+    // same JVM
+    FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes executor service with a fixed thread pool.
+   *
+   * @param threadNum specify the capacity of the fixed thread pool
+   */
+  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+    executorService = Executors.newFixedThreadPool(threadNum);
+  }
+
+  /**
+   * Cleanups the executor service.
+   */
+  protected void cleanupExecutorService() {
+    if (this.executorService != null) {
+      this.executorService.shutdownNow();
+      this.executorService = null;
     }
+  }
 
-    /**
-     * Cleanups the executor service.
-     */
-    protected void cleanupExecutorService() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-            this.executorService = null;
-        }
+  private void initFileSystemWithConfiguration(Configuration configuration) {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    private void initFileSystemWithConfiguration(Configuration configuration) {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        fs = FSUtils.getFs(basePath, configuration);
-        if (fs instanceof LocalFileSystem) {
-            LocalFileSystem lfs = (LocalFileSystem) fs;
-            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-            // So, for the tests, we enforce checksum verification to circumvent the problem
-            lfs.setVerifyChecksum(true);
-        }
+    fs = FSUtils.getFs(basePath, configuration);
+    if (fs instanceof LocalFileSystem) {
+      LocalFileSystem lfs = (LocalFileSystem) fs;
+      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+      // So, for the tests, we enforce checksum verification to circumvent the problem
+      lfs.setVerifyChecksum(true);
     }
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-        return getHoodieWriteClient(cfg, false);
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return getHoodieWriteClient(cfg, false);
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+  }
 
-    public HoodieReadClient getHoodieReadClient(String basePath) {
-        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-    }
+  public HoodieReadClient getHoodieReadClient(String basePath) {
+    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-        HoodieIndex index) {
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-        return client;
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+      HoodieIndex index) {
+    if (null != client) {
+      client.close();
+      client = null;
     }
+    client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+    return client;
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6887531..99d9b2b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestUtils;
@@ -130,8 +129,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieWriteClient writeClient = getHoodieWriteClient(config);
-    writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 9f3eaea..fdc8b27 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -655,6 +655,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withEmbeddedTimelineServerEnabled(true)
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
         .build();
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 09d62a7..86a2e1f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.table.compact;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -129,7 +129,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
-     String newCommitTime = "100";
+      String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
index d77392f..b2a7f83 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -385,7 +385,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     // filterCompletedAndCompactionInstants
     // This cannot be done using checkFilter as it involves both states and actions
     final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
-    final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
+    final Set<State> states = Sets.newHashSet(State.COMPLETED);
     final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
     sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
         .forEach(i -> assertTrue(t1.containsInstant(i)));


[hudi] 01/04: [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6dcd0a3524fe7be0bbbd3e673ed7e1d4b035e0cb
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Tue Jun 2 01:49:37 2020 -0700

    [HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  16 +
 .../org/apache/hudi/cli/commands/StatsCommand.java |   4 +-
 .../cli/commands/AbstractShellIntegrationTest.java |   2 +-
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 -----
 .../org/apache/hudi/client/HoodieWriteClient.java  |   2 +-
 .../apache/hudi/client/TestHoodieClientBase.java   | 938 ++++++++++-----------
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 -
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |   2 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java |  17 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |   2 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   2 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |  12 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   5 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  38 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |  12 +-
 pom.xml                                            |   1 +
 19 files changed, 745 insertions(+), 950 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 2e3bc01..708ae29 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -33,4 +33,20 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_HOODIE_PROPERTY = "Property";
   public static final String HEADER_OLD_VALUE = "Old Value";
   public static final String HEADER_NEW_VALUE = "New Value";
+
+  /**
+   * Fields of Stats.
+   */
+  public static final String HEADER_COMMIT_TIME = "CommitTime";
+  public static final String HEADER_TOTAL_UPSERTED = "Total Upserted";
+  public static final String HEADER_TOTAL_WRITTEN = "Total Written";
+  public static final String HEADER_WRITE_AMPLIFICATION_FACTOR = "Write Amplification Factor";
+  public static final String HEADER_HISTOGRAM_MIN = "Min";
+  public static final String HEADER_HISTOGRAM_10TH = "10th";
+  public static final String HEADER_HISTOGRAM_50TH = "50th";
+  public static final String HEADER_HISTOGRAM_AVG = "avg";
+  public static final String HEADER_HISTOGRAM_95TH = "95th";
+  public static final String HEADER_HISTOGRAM_MAX = "Max";
+  public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
+  public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index b05aee2..4874777 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @Component
 public class StatsCommand implements CommandMarker {
 
-  private static final int MAX_FILES = 1000000;
+  public static final int MAX_FILES = 1000000;
 
   @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
       + "records were actually written")
@@ -97,7 +97,7 @@ public class StatsCommand implements CommandMarker {
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
 
-  private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
+  public Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
     return new Comparable[] {commitTime, s.getMin(), s.getValue(0.1), s.getMedian(), s.getMean(), s.get95thPercentile(),
         s.getMax(), s.size(), s.getStdDev()};
   }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
index ad81af5..d9f1688 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -58,4 +58,4 @@ public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarne
   protected static JLineShellComponent getShell() {
     return shell;
   }
-}
\ No newline at end of file
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
deleted file mode 100644
index 9e78ac7..0000000
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.HoodiePrintHelper;
-import org.apache.hudi.cli.HoodieTableHeaderFields;
-import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.TimelineLayoutVersion;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FSUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test class for {@link RepairsCommand}.
- */
-public class TestRepairsCommand extends AbstractShellIntegrationTest {
-
-  private String tablePath;
-
-  @Before
-  public void init() throws IOException {
-    String tableName = "test_table";
-    tablePath = basePath + File.separator + tableName;
-
-    // Create table and connect
-    new TableCommand().createTable(
-        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
-        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
-  }
-
-  /**
-   * Test case for dry run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithDryRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    // default is dry run.
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
-    assertTrue(cr.isSuccess());
-
-    // expected all 'No'.
-    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
-        .stream()
-        .map(partition -> new String[] {partition, "No", "None"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for real run 'repair addpartitionmeta'.
-   */
-  @Test
-  public void testAddPartitionMetaWithRealRun() throws IOException {
-    // create commit instant
-    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
-
-    // create partition path
-    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
-    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
-    assertTrue(fs.mkdirs(new Path(partition1)));
-    assertTrue(fs.mkdirs(new Path(partition2)));
-    assertTrue(fs.mkdirs(new Path(partition3)));
-
-    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
-    assertTrue(cr.isSuccess());
-
-    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
-    // after dry run, the action will be 'Repaired'
-    String[][] rows = paths.stream()
-        .map(partition -> new String[] {partition, "No", "Repaired"})
-        .toArray(String[][]::new);
-    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-
-    assertEquals(expected, cr.getResult().toString());
-
-    cr = getShell().executeCommand("repair addpartitionmeta");
-
-    // after real run, Metadata is present now.
-    rows = paths.stream()
-        .map(partition -> new String[] {partition, "Yes", "None"})
-        .toArray(String[][]::new);
-    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
-        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
-    assertEquals(expected, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair overwrite-hoodie-props'.
-   */
-  @Test
-  public void testOverwriteHoodieProperties() throws IOException {
-    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
-    assertNotNull("New property file must exist", newProps);
-
-    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
-    assertTrue(cr.isSuccess());
-
-    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
-
-    // after overwrite, the stored value in .hoodie is equals to which read from properties.
-    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
-    Properties expectProps = new Properties();
-    expectProps.load(new FileInputStream(new File(newProps.getPath())));
-
-    Map<String, String> expected = expectProps.entrySet().stream()
-        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
-    assertEquals(expected, result);
-
-    // check result
-    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
-        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
-    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
-        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
-        .toArray(String[][]::new);
-    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
-        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
-
-    assertEquals(expect, cr.getResult().toString());
-  }
-
-  /**
-   * Test case for 'repair corrupted clean files'.
-   */
-  @Test
-  public void testRemoveCorruptedPendingCleanAction() throws IOException {
-    HoodieCLI.conf = jsc.hadoopConfiguration();
-
-    Configuration conf = HoodieCLI.conf;
-
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    // Create four requested files
-    for (int i = 100; i < 104; i++) {
-      String timestamp = String.valueOf(i);
-      // Write corrupted requested Compaction
-      HoodieTestCommitMetadataGenerator.createEmptyCleanRequestedFile(tablePath, timestamp, conf);
-    }
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    // first, there are four instants
-    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-
-    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
-    assertTrue(cr.isSuccess());
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
-  }
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 37dfe3d..f5f6233 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -120,7 +120,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
   }
 
-  HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
     this(jsc, clientConfig, rollbackPending, index, Option.empty());
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 5f47bf5..6e6458b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,496 +72,477 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-  @Before
-  public void setUp() throws Exception {
-    initResources();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-    return getHoodieWriteClient(cfg, false);
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc));
-  }
-
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-                                                   HoodieIndex index) {
-    return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-  }
-
-  protected HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-  }
-
-  /**
-   * Get Default HoodieWriteConfig for tests.
-   *
-   * @return Default Hoodie Write Config for tests
-   */
-  protected HoodieWriteConfig getConfig() {
-    return getConfigBuilder().build();
-  }
-
-  protected HoodieWriteConfig getConfig(IndexType indexType) {
-    return getConfigBuilder(indexType).build();
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  protected HoodieWriteConfig.Builder getConfigBuilder() {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
-  }
-
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, IndexType.BLOOM);
-  }
-
-  /**
-   * Get Config builder with default configs set.
-   *
-   * @return Config Builder
-   */
-  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-        .withWriteStatusClass(MetadataMergeWriteStatus.class)
-        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-        .forTable("test-trip-table")
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-  }
-
-  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    ((SyncableFileSystemView) (table.getSliceView())).reset();
-    return table;
-  }
-
-  /**
-   * Assert no failures in writing hoodie files.
-   *
-   * @param statuses List of Write Status
-   */
-  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-    // Verify there are no errors
-    for (WriteStatus status : statuses) {
-      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+    @Before
+    public void setUp() throws Exception {
+        initResources();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupResources();
+    }
+
+    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+    }
+
+    /**
+     * Get Default HoodieWriteConfig for tests.
+     *
+     * @return Default Hoodie Write Config for tests
+     */
+    protected HoodieWriteConfig getConfig() {
+        return getConfigBuilder().build();
     }
-  }
-
-  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputRecords.stream()
-        .map(HoodieRecord::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-    Set<String> partitionPathSet = inputKeys.stream()
-        .map(HoodieKey::getPartitionPath)
-        .collect(Collectors.toSet());
-    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-  }
-
-  /**
-   * Ensure presence of partition meta-data at known depth.
-   *
-   * @param partitionPaths Partition paths to check
-   * @param fs             File System
-   * @throws IOException in case of error
-   */
-  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-    for (String partitionPath : partitionPaths) {
-      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-      pmeta.readFromFS();
-      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+
+    protected HoodieWriteConfig getConfig(IndexType indexType) {
+        return getConfigBuilder(indexType).build();
+    }
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    protected HoodieWriteConfig.Builder getConfigBuilder() {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     }
-  }
-
-  /**
-   * Ensure records have location field set.
-   *
-   * @param taggedRecords Tagged Records
-   * @param commitTime    Commit Timestamp
-   */
-  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-    for (HoodieRecord rec : taggedRecords) {
-      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-          rec.getCurrentLocation().getInstantTime(), commitTime);
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
     }
-  }
-
-  /**
-   * Assert that there is no duplicate key at the partition level.
-   *
-   * @param records List of Hoodie records
-   */
-  void assertNodupesWithinPartition(List<HoodieRecord> records) {
-    Map<String, Set<String>> partitionToKeys = new HashMap<>();
-    for (HoodieRecord r : records) {
-      String key = r.getRecordKey();
-      String partitionPath = r.getPartitionPath();
-      if (!partitionToKeys.containsKey(partitionPath)) {
-        partitionToKeys.put(partitionPath, new HashSet<>());
-      }
-      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-      partitionToKeys.get(partitionPath).add(key);
+
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+        return getConfigBuilder(schemaStr, IndexType.BLOOM);
     }
-  }
-
-  /**
-   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by record-generation function itself.
-   *
-   * @param writeConfig       Hoodie Write Config
-   * @param recordGenFunction Records Generation function
-   * @return Wrapped function
-   */
-  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-    return (commit, numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-      return taggedRecords.collect();
-    };
-  }
-
-  /**
-   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
-   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
-   * guaranteed by key-generation function itself.
-   *
-   * @param writeConfig    Hoodie Write Config
-   * @param keyGenFunction Keys Generation function
-   * @return Wrapped function
-   */
-  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-    return (numRecords) -> {
-      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-      List<HoodieKey> records = keyGenFunction.apply(numRecords);
-      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-      return taggedRecords.map(record -> record.getKey()).collect();
-    };
-  }
-
-  /**
-   * Generate wrapper for record generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-                                                                                 HoodieWriteConfig writeConfig,
-                                                                                 Function2<List<HoodieRecord>, String, Integer> wrapped) {
-    if (isPreppedAPI) {
-      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    /**
+     * Get Config builder with default configs set.
+     *
+     * @return Config Builder
+     */
+    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+            .withWriteStatusClass(MetadataMergeWriteStatus.class)
+            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+            .forTable("test-trip-table")
+            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
     }
-  }
-
-  /**
-   * Generate wrapper for delete key generation function for testing Prepped APIs.
-   *
-   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-   * @param writeConfig  Hoodie Write Config
-   * @param wrapped      Actual Records Generation function
-   * @return Wrapped Function
-   */
-  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-                                                                       HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-    if (isPreppedAPI) {
-      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-    } else {
-      return wrapped;
+
+    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+        ((SyncableFileSystemView) (table.getSliceView())).reset();
+        return table;
     }
-  }
-
-  /**
-   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param writeFn                Write Function to be used for insertion
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                        String initCommitTime, int numRecordsInThisCommit,
-                                        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-  }
-
-  /**
-   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig                  Hoodie Write Config
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param writeFn                      Write Function to be used for upsert
-   * @param isPreppedAPI                 Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits);
-  }
-
-  /**
-   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-   *
-   * @param writeConfig            Hoodie Write Config
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit Number of records to be added in the new commit
-   * @param deleteFn               Delete Function to be used for deletes
-   * @param isPreppedAPI           Boolean flag to indicate writeFn expects prepped records
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @return RDD of write-status
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-                                   String prevCommitTime, String initCommitTime,
-                                   int numRecordsInThisCommit,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-    final Function<Integer, List<HoodieKey>> keyGenFunction =
-        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-        keyGenFunction,
-        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-  }
-
-  /**
-   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-   *
-   * @param client                       Hoodie Write Client
-   * @param newCommitTime                New Commit Timestamp to be used
-   * @param prevCommitTime               Commit Timestamp used in previous commit
-   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-   * @param initCommitTime               Begin Timestamp (usually "000")
-   * @param numRecordsInThisCommit       Number of records to be added in the new commit
-   * @param recordGenFunction            Records Generation Function
-   * @param writeFn                      Write Function to be used for upsert
-   * @param assertForCommit              Enable Assertion of Writes
-   * @param expRecordsInThisCommit       Expected number of records in this commit
-   * @param expTotalRecords              Expected number of records when scanned
-   * @param expTotalCommits              Expected number of commits (including this commit)
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                  Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-                                  Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-                                  Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                                  boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-    // Write 1 (only inserts)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(records, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-      if (commitTimesBetweenPrevAndNew.isPresent()) {
-        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-        });
-      }
+
+    /**
+     * Assert no failures in writing hoodie files.
+     *
+     * @param statuses List of Write Status
+     */
+    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+        // Verify there are no errors
+        for (WriteStatus status : statuses) {
+            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+        }
     }
-    return result;
-  }
-
-  /**
-   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-   *
-   * @param client                 Hoodie Write Client
-   * @param newCommitTime          New Commit Timestamp to be used
-   * @param prevCommitTime         Commit Timestamp used in previous commit
-   * @param initCommitTime         Begin Timestamp (usually "000")
-   * @param keyGenFunction         Key Generation function
-   * @param deleteFn               Write Function to be used for delete
-   * @param assertForCommit        Enable Assertion of Writes
-   * @param expRecordsInThisCommit Expected number of records in this commit
-   * @param expTotalRecords        Expected number of records when scanned
-   * @throws Exception in case of error
-   */
-  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-                                   String initCommitTime, int numRecordsInThisCommit,
-                                   Function<Integer, List<HoodieKey>> keyGenFunction,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-                                   boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-    // Delete 1 (only deletes)
-    client.startCommitWithTime(newCommitTime);
-
-    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-    List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
-
-    // check the partition metadata is written out
-    assertPartitionMetadataForKeys(keysToDelete, fs);
-
-    // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-    if (assertForCommit) {
-      assertEquals("Expecting 3 commits.", 3,
-          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-          timeline.lastInstant().get().getTimestamp());
-      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-      // Check the entire dataset has all records still
-      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-      for (int i = 0; i < fullPartitionPaths.length; i++) {
-        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-      }
-      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-      // Check that the incremental consumption from prevCommitTime
-      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-              + " since it is a delete operation",
-          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+
+    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputRecords.stream()
+            .map(HoodieRecord::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
     }
-    return result;
-  }
 
-  /**
-   * Get Cleaner state corresponding to a partition path.
-   *
-   * @param hoodieCleanStatsTwo List of Clean Stats
-   * @param partitionPath       Partition path for filtering
-   * @return Cleaner state corresponding to partition path
-   */
-  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-  }
+    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+        Set<String> partitionPathSet = inputKeys.stream()
+            .map(HoodieKey::getPartitionPath)
+            .collect(Collectors.toSet());
+        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+    }
 
-  // Functional Interfaces for passing lambda and Hoodie Write API contexts
+    /**
+     * Ensure presence of partition meta-data at known depth.
+     *
+     * @param partitionPaths Partition paths to check
+     * @param fs File System
+     * @throws IOException in case of error
+     */
+    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+        for (String partitionPath : partitionPaths) {
+            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+            pmeta.readFromFS();
+            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function2<R, T1, T2> {
+    /**
+     * Ensure records have location field set.
+     *
+     * @param taggedRecords Tagged Records
+     * @param commitTime Commit Timestamp
+     */
+    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+        for (HoodieRecord rec : taggedRecords) {
+            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+                rec.getCurrentLocation().getInstantTime(), commitTime);
+        }
+    }
 
-    R apply(T1 v1, T2 v2) throws IOException;
-  }
+    /**
+     * Assert that there is no duplicate key at the partition level.
+     *
+     * @param records List of Hoodie records
+     */
+    void assertNodupesWithinPartition(List<HoodieRecord> records) {
+        Map<String, Set<String>> partitionToKeys = new HashMap<>();
+        for (HoodieRecord r : records) {
+            String key = r.getRecordKey();
+            String partitionPath = r.getPartitionPath();
+            if (!partitionToKeys.containsKey(partitionPath)) {
+                partitionToKeys.put(partitionPath, new HashSet<>());
+            }
+            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+            partitionToKeys.get(partitionPath).add(key);
+        }
+    }
 
-  @FunctionalInterface
-  public interface Function3<R, T1, T2, T3> {
+    /**
+     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param recordGenFunction Records Generation function
+     * @return Wrapped function
+     */
+    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+        return (commit, numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+            return taggedRecords.collect();
+        };
+    }
 
-    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-  }
+    /**
+     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param keyGenFunction Keys Generation function
+     * @return Wrapped function
+     */
+    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+        return (numRecords) -> {
+            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+            List<HoodieKey> records = keyGenFunction.apply(numRecords);
+            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+            return taggedRecords.map(record -> record.getKey()).collect();
+        };
+    }
+
+    /**
+     * Generate wrapper for record generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig,
+        Function2<List<HoodieRecord>, String, Integer> wrapped) {
+        if (isPreppedAPI) {
+            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Generate wrapper for delete key generation function for testing Prepped APIs.
+     *
+     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+     * @param writeConfig Hoodie Write Config
+     * @param wrapped Actual Records Generation function
+     * @return Wrapped Function
+     */
+    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+        if (isPreppedAPI) {
+            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+        } else {
+            return wrapped;
+        }
+    }
+
+    /**
+     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for insertion
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+    }
+
+    /**
+     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param writeFn Write Function to be used for upsert
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+            expTotalCommits);
+    }
+
+    /**
+     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+     *
+     * @param writeConfig Hoodie Write Config
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param deleteFn Delete Function to be used for deletes
+     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @return RDD of write-status
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+        String prevCommitTime, String initCommitTime,
+        int numRecordsInThisCommit,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+        final Function<Integer, List<HoodieKey>> keyGenFunction =
+            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+            keyGenFunction,
+            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+    }
+
+    /**
+     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param numRecordsInThisCommit Number of records to be added in the new commit
+     * @param recordGenFunction Records Generation Function
+     * @param writeFn Write Function to be used for upsert
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @param expTotalCommits Expected number of commits (including this commit)
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+        // Write 1 (only inserts)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForRecords(records, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+            if (commitTimesBetweenPrevAndNew.isPresent()) {
+                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+                });
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+     *
+     * @param client Hoodie Write Client
+     * @param newCommitTime New Commit Timestamp to be used
+     * @param prevCommitTime Commit Timestamp used in previous commit
+     * @param initCommitTime Begin Timestamp (usually "000")
+     * @param keyGenFunction Key Generation function
+     * @param deleteFn Write Function to be used for delete
+     * @param assertForCommit Enable Assertion of Writes
+     * @param expRecordsInThisCommit Expected number of records in this commit
+     * @param expTotalRecords Expected number of records when scanned
+     * @throws Exception in case of error
+     */
+    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+        String initCommitTime, int numRecordsInThisCommit,
+        Function<Integer, List<HoodieKey>> keyGenFunction,
+        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+        // Delete 1 (only deletes)
+        client.startCommitWithTime(newCommitTime);
+
+        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+        List<WriteStatus> statuses = result.collect();
+        assertNoWriteErrors(statuses);
+
+        // check the partition metadata is written out
+        assertPartitionMetadataForKeys(keysToDelete, fs);
+
+        // verify that there is a commit
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+        if (assertForCommit) {
+            assertEquals("Expecting 3 commits.", 3,
+                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+                timeline.lastInstant().get().getTimestamp());
+            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+            // Check the entire dataset has all records still
+            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+            for (int i = 0; i < fullPartitionPaths.length; i++) {
+                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+            }
+            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+            // Check that the incremental consumption from prevCommitTime
+            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+                    + " since it is a delete operation",
+                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+        }
+        return result;
+    }
+
+    /**
+     * Get Cleaner state corresponding to a partition path.
+     *
+     * @param hoodieCleanStatsTwo List of Clean Stats
+     * @param partitionPath Partition path for filtering
+     * @return Cleaner state corresponding to partition path
+     */
+    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+    }
+
+    // Functional Interfaces for passing lambda and Hoodie Write API contexts
+
+    @FunctionalInterface
+    public interface Function2<R, T1, T2> {
+
+        R apply(T1 v1, T2 v2) throws IOException;
+    }
+
+    @FunctionalInterface
+    public interface Function3<R, T1, T2, T3> {
+
+        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+    }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 9b70c10..8d3fa13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness {
     cleanupTestDataGenerator();
   }
 
-  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
-  }
-
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index aad8edf..ab6e940 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.TestRawTripPayload;
+import java.io.IOException;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -58,8 +59,9 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
     cleanupSparkContexts();
+    cleanupFileSystem();
   }
 
   //@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..e4202f0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,11 +17,15 @@
 
 package org.apache.hudi.common;
 
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,225 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-  protected transient JavaSparkContext jsc = null;
-  protected transient SQLContext sqlContext;
-  protected transient FileSystem fs;
-  protected transient HoodieTestDataGenerator dataGen = null;
-  protected transient ExecutorService executorService;
-  protected transient HoodieTableMetaClient metaClient;
-  private static AtomicInteger instantGen = new AtomicInteger(1);
-
-  public String getNextInstant() {
-    return String.format("%09d", instantGen.getAndIncrement());
-  }
-
-  // dfs
-  protected String dfsBasePath;
-  protected transient HdfsTestService hdfsTestService;
-  protected transient MiniDFSCluster dfsCluster;
-  protected transient DistributedFileSystem dfs;
-
-  /**
-   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-   *
-   * @throws IOException
-   */
-  public void initResources() throws IOException {
-    initPath();
-    initSparkContexts();
-    initTestDataGenerator();
-    initFileSystem();
-    initMetaClient();
-  }
-
-  /**
-   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-   * 
-   * @throws IOException
-   */
-  public void cleanupResources() throws IOException {
-    cleanupMetaClient();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupFileSystem();
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-   *
-   * @param appName The specified application name.
-   */
-  protected void initSparkContexts(String appName) {
-    // Initialize a local spark env
-    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-    jsc.setLogLevel("ERROR");
-
-    // SQLContext stuff
-    sqlContext = new SQLContext(jsc);
-  }
-
-  /**
-   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-   * <b>TestHoodieClient</b>.
-   */
-  protected void initSparkContexts() {
-    initSparkContexts("TestHoodieClient");
-  }
-
-  /**
-   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-   */
-  protected void cleanupSparkContexts() {
-    if (sqlContext != null) {
-      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-      sqlContext.clearCache();
-      sqlContext = null;
+    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+    protected transient JavaSparkContext jsc = null;
+    protected transient SQLContext sqlContext;
+    protected transient FileSystem fs;
+    protected transient HoodieTestDataGenerator dataGen = null;
+    protected transient ExecutorService executorService;
+    protected transient HoodieTableMetaClient metaClient;
+    private static AtomicInteger instantGen = new AtomicInteger(1);
+    protected transient HoodieWriteClient client;
+
+    public String getNextInstant() {
+        return String.format("%09d", instantGen.getAndIncrement());
+    }
+
+    // dfs
+    protected String dfsBasePath;
+    protected transient HdfsTestService hdfsTestService;
+    protected transient MiniDFSCluster dfsCluster;
+    protected transient DistributedFileSystem dfs;
+
+    /**
+     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void initResources() throws IOException {
+        initPath();
+        initSparkContexts();
+        initTestDataGenerator();
+        initFileSystem();
+        initMetaClient();
+    }
+
+    /**
+     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+     */
+    public void cleanupResources() throws IOException {
+        cleanupClients();
+        cleanupSparkContexts();
+        cleanupTestDataGenerator();
+        cleanupFileSystem();
+    }
+
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+     *
+     * @param appName The specified application name.
+     */
+    protected void initSparkContexts(String appName) {
+        // Initialize a local spark env
+        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+        jsc.setLogLevel("ERROR");
+
+        // SQLContext stuff
+        sqlContext = new SQLContext(jsc);
     }
 
-    if (jsc != null) {
-      LOG.info("Closing spark context used in previous test-case");
-      jsc.close();
-      jsc.stop();
-      jsc = null;
+    /**
+     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+     * <b>TestHoodieClient</b>.
+     */
+    protected void initSparkContexts() {
+        initSparkContexts("TestHoodieClient");
     }
-  }
-
-  /**
-   * Initializes a file system with the hadoop configuration of Spark context.
-   */
-  protected void initFileSystem() {
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+
+    /**
+     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+     */
+    protected void cleanupSparkContexts() {
+        if (sqlContext != null) {
+            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+            sqlContext.clearCache();
+            sqlContext = null;
+        }
+
+        if (jsc != null) {
+            LOG.info("Closing spark context used in previous test-case");
+            jsc.close();
+            jsc.stop();
+            jsc = null;
+        }
+    }
+
+    /**
+     * Initializes a file system with the hadoop configuration of Spark context.
+     */
+    protected void initFileSystem() {
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
     }
 
-    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-  }
-
-  /**
-   * Initializes file system with a default empty configuration.
-   */
-  protected void initFileSystemWithDefaultConfiguration() {
-    initFileSystemWithConfiguration(new Configuration());
-  }
-
-  /**
-   * Cleanups file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupFileSystem() throws IOException {
-    if (fs != null) {
-      LOG.warn("Closing file-system instance used in previous test-run");
-      fs.close();
+    /**
+     * Initializes file system with a default empty configuration.
+     */
+    protected void initFileSystemWithDefaultConfiguration() {
+        initFileSystemWithConfiguration(new Configuration());
     }
-  }
-
-  /**
-   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by
-   * {@code getTableType()}.
-   *
-   * @throws IOException
-   */
-  protected void initMetaClient() throws IOException {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+
+    /**
+     * Cleanups file system.
+     */
+    protected void cleanupFileSystem() throws IOException {
+        if (fs != null) {
+            LOG.warn("Closing file-system instance used in previous test-run");
+            fs.close();
+        }
     }
 
-    if (jsc == null) {
-      throw new IllegalStateException("The Spark context has not been initialized.");
+    /**
+     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+     */
+    protected void initMetaClient() throws IOException {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        if (jsc == null) {
+            throw new IllegalStateException("The Spark context has not been initialized.");
+        }
+
+        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
     }
 
-    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-  }
-
-  /**
-   * Cleanups table type.
-   */
-  protected void cleanupMetaClient() {
-    metaClient = null;
-  }
-
-  /**
-   * Initializes a test data generator which used to generate test datas.
-   *
-   */
-  protected void initTestDataGenerator() {
-    dataGen = new HoodieTestDataGenerator();
-  }
-
-  /**
-   * Cleanups test data generator.
-   *
-   */
-  protected void cleanupTestDataGenerator() {
-    dataGen = null;
-  }
-
-  /**
-   * Initializes a distributed file system and base directory.
-   *
-   * @throws IOException
-   */
-  protected void initDFS() throws IOException {
-    FileSystem.closeAll();
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-
-    // Create a temp folder as the base path
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    dfs.mkdirs(new Path(dfsBasePath));
-  }
-
-  /**
-   * Cleanups the distributed file system.
-   *
-   * @throws IOException
-   */
-  protected void cleanupDFS() throws IOException {
-    if (hdfsTestService != null) {
-      hdfsTestService.stop();
-      dfsCluster.shutdown();
+    /**
+     * Cleanups table type.
+     */
+    protected void cleanupClients() {
+        metaClient = null;
+        if (null != client) {
+            client.close();
+            client = null;
+        }
     }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
-  }
-
-  /**
-   * Initializes executor service with a fixed thread pool.
-   *
-   * @param threadNum specify the capacity of the fixed thread pool
-   */
-  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-    executorService = Executors.newFixedThreadPool(threadNum);
-  }
-
-  /**
-   * Cleanups the executor service.
-   */
-  protected void cleanupExecutorService() {
-    if (this.executorService != null) {
-      this.executorService.shutdownNow();
-      this.executorService = null;
+
+    /**
+     * Initializes a test data generator which used to generate test datas.
+     */
+    protected void initTestDataGenerator() {
+        dataGen = new HoodieTestDataGenerator();
     }
-  }
 
-  private void initFileSystemWithConfiguration(Configuration configuration) {
-    if (basePath == null) {
-      throw new IllegalStateException("The base path has not been initialized.");
+    /**
+     * Cleanups test data generator.
+     */
+    protected void cleanupTestDataGenerator() {
+        dataGen = null;
     }
 
-    fs = FSUtils.getFs(basePath, configuration);
-    if (fs instanceof LocalFileSystem) {
-      LocalFileSystem lfs = (LocalFileSystem) fs;
-      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-      // So, for the tests, we enforce checksum verification to circumvent the problem
-      lfs.setVerifyChecksum(true);
+    /**
+     * Initializes a distributed file system and base directory.
+     */
+    protected void initDFS() throws IOException {
+        FileSystem.closeAll();
+        hdfsTestService = new HdfsTestService();
+        dfsCluster = hdfsTestService.start(true);
+
+        // Create a temp folder as the base path
+        dfs = dfsCluster.getFileSystem();
+        dfsBasePath = dfs.getWorkingDirectory().toString();
+        dfs.mkdirs(new Path(dfsBasePath));
     }
-  }
 
+    /**
+     * Cleanups the distributed file system.
+     */
+    protected void cleanupDFS() throws IOException {
+        if (hdfsTestService != null) {
+            hdfsTestService.stop();
+            dfsCluster.shutdown();
+            hdfsTestService = null;
+            dfsCluster = null;
+            dfs = null;
+        }
+        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+        // same JVM
+        FileSystem.closeAll();
+    }
+
+    /**
+     * Initializes executor service with a fixed thread pool.
+     *
+     * @param threadNum specify the capacity of the fixed thread pool
+     */
+    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+        executorService = Executors.newFixedThreadPool(threadNum);
+    }
+
+    /**
+     * Cleanups the executor service.
+     */
+    protected void cleanupExecutorService() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+            this.executorService = null;
+        }
+    }
+
+    private void initFileSystemWithConfiguration(Configuration configuration) {
+        if (basePath == null) {
+            throw new IllegalStateException("The base path has not been initialized.");
+        }
+
+        fs = FSUtils.getFs(basePath, configuration);
+        if (fs instanceof LocalFileSystem) {
+            LocalFileSystem lfs = (LocalFileSystem) fs;
+            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+            // So, for the tests, we enforce checksum verification to circumvent the problem
+            lfs.setVerifyChecksum(true);
+        }
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+        return getHoodieWriteClient(cfg, false);
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+    }
+
+    public HoodieReadClient getHoodieReadClient(String basePath) {
+        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    }
+
+    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+        HoodieIndex index) {
+        if (null != client) {
+            client.close();
+            client = null;
+        }
+        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+        return client;
+    }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 05638e2..6ddb578 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -60,7 +60,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     if (utility != null) {
       utility.shutdownMiniCluster();
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..43f2fd1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -86,6 +86,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   @AfterClass
   public static void clean() throws Exception {
     if (utility != null) {
+      utility.deleteTable(tableName);
       utility.shutdownMiniCluster();
     }
   }
@@ -115,11 +116,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
@@ -132,7 +129,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
@@ -172,7 +169,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(newCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -206,7 +203,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -256,7 +253,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // only for test, set the hbaseConnection to mocked object
     index.setHbaseConnection(hbaseConnection);
 
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
@@ -281,7 +278,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   public void testTotalPutsBatching() throws Exception {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = getWriteClient(config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 91435f8..b97fefc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -45,7 +45,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index d29cfa4..105b0e8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -107,7 +107,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupFileSystem();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index ddf2775..55d4526 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -80,7 +80,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
   @After
   public void tearDown() {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 664f4b5..7fd02bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -68,11 +68,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-    cleanupMetaClient();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
+    cleanupFileSystem();
   }
 
   @Test
@@ -83,9 +80,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
 
     // Build a write config with bulkinsertparallelism set
     HoodieWriteConfig cfg = getConfigBuilder().build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
-
       /**
        * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times
        * each. id1 (21 records), id2 (21 records), id3, id4
@@ -224,7 +220,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
   public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder().build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index ec64080..6887531 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestUtils;
@@ -85,7 +86,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
-    cleanupMetaClient();
+    cleanupClients();
     cleanupFileSystem();
     cleanupTestDataGenerator();
   }
@@ -129,6 +130,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 740caf2..fdc968d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -96,16 +96,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     cleanupDFS();
     cleanupSparkContexts();
     cleanupTestDataGenerator();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   @Test
   public void testSimpleInsertAndUpdate() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -190,7 +187,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       String newCommitTime = "001";
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -213,7 +210,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -298,7 +295,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts)
@@ -351,7 +348,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
       /**
@@ -394,7 +391,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       final String commitTime1 = "002";
       // WriteClient with custom config (disable small file handling)
-      try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
+      try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
         secondClient.startCommitWithTime(commitTime1);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -424,7 +421,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        * Write 3 (inserts + updates - testing successful delta commit)
        */
       final String commitTime2 = "002";
-      try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
+      try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
         thirdClient.startCommitWithTime(commitTime2);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -500,7 +497,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    try (final HoodieWriteClient client = getWriteClient(cfg);) {
+    try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       /**
        * Write 1 (only inserts)
        */
@@ -541,7 +538,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
        */
       newCommitTime = "002";
       // WriteClient with custom config (disable small file handling)
-      HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
+      HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
       nClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -664,7 +661,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   @Test
   public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
        * Write 1 (only inserts, written as parquet file)
@@ -743,7 +740,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -816,7 +813,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -853,7 +850,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -927,7 +924,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -979,10 +976,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsInMetadata() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
@@ -1080,7 +1076,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
   public void testRollingStatsWithSmallFileHandling() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
-    try (HoodieWriteClient client = getWriteClient(cfg);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 8fa55ec..09d62a7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupSparkContexts();
-  }
-
-  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
-    return new HoodieWriteClient(jsc, config);
+    cleanupClients();
   }
 
   private HoodieWriteConfig getConfig() {
@@ -114,8 +111,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -132,8 +128,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    try (HoodieWriteClient writeClient = getWriteClient(config);) {
-      String newCommitTime = "100";
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+     String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/pom.xml b/pom.xml
index f1990e1..6370c37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <skip>${skipUTs}</skip>
+          <argLine>-Xmx4g</argLine>
           <systemPropertyVariables>
             <log4j.configuration>
               ${surefire-log4j.file}


[hudi] 04/04: [HUDI-988] Fix More Unit Test Flakiness

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 864a7cd880cf80aac056aac0658ee94f53b36ac9
Author: garyli1019 <ya...@gmail.com>
AuthorDate: Fri Jun 5 17:25:59 2020 -0700

    [HUDI-988] Fix More Unit Test Flakiness
---
 .../hudi/client/TestCompactionAdminClient.java     |  35 +++----
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   3 +-
 .../hudi/common/HoodieClientTestHarness.java       |  69 +++++++++++---
 .../execution/TestBoundedInMemoryExecutor.java     |   2 +-
 .../hudi/execution/TestBoundedInMemoryQueue.java   |   3 +-
 .../org/apache/hudi/index/TestHoodieIndex.java     |  29 +++++-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   4 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   5 +-
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java |   3 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |   6 +-
 .../apache/hudi/table/TestConsistencyGuard.java    |   2 +-
 .../apache/hudi/table/TestCopyOnWriteTable.java    |   4 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    | 104 ++++++++++-----------
 .../hudi/table/compact/TestAsyncCompaction.java    |   2 +-
 .../hudi/table/compact/TestHoodieCompactor.java    |   5 +-
 .../table/view/HoodieTableFileSystemView.java      |   6 ++
 .../timeline/service/FileSystemViewHandler.java    |   2 +-
 18 files changed, 162 insertions(+), 126 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 8e94857..b82863f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -33,9 +33,9 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.compact.OperationResult;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,13 +67,6 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
     client = new CompactionAdminClient(jsc, basePath);
   }
 
-  @After
-  public void tearDown() {
-    client.close();
-    metaClient = null;
-    cleanupSparkContexts();
-  }
-
   @Test
   public void testUnscheduleCompactionPlan() throws Exception {
     int numEntriesPerInstant = 10;
@@ -142,13 +135,13 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
     List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles =
         result.stream().flatMap(r -> getRenamingActionsToAlignWithCompactionOperation(metaClient,
             compactionInstant, r.getOperation(), Option.empty()).stream()).map(rn -> {
-              try {
-                renameLogFile(metaClient, rn.getKey(), rn.getValue());
-              } catch (IOException e) {
-                throw new HoodieIOException(e.getMessage(), e);
-              }
-              return rn;
-            }).collect(Collectors.toList());
+          try {
+            renameLogFile(metaClient, rn.getKey(), rn.getValue());
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+          return rn;
+        }).collect(Collectors.toList());
     Map<String, String> renameFilesFromUndo = undoFiles.stream()
         .collect(Collectors.toMap(p -> p.getRight().getPath().toString(), x -> x.getLeft().getPath().toString()));
     Map<String, String> expRenameFiles = renameFiles.stream()
@@ -274,9 +267,9 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
     // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
     newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
         .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
-          Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
-          Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
-        });
+      Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
+      Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
+    });
 
     // Ensure same number of log-files before and after renaming per fileId
     Map<String, Long> fileIdToCountsAfterRenaming =
@@ -335,9 +328,9 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
     newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
         .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
         .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
-          Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
-          Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
-        });
+      Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
+      Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
+    });
 
     // Ensure same number of log-files before and after renaming per fileId
     Map<String, Long> fileIdToCountsAfterRenaming =
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 8d3fa13..24ecc8e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -63,9 +63,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupSparkContexts();
-    cleanupDFS();
-    cleanupTestDataGenerator();
+    cleanupResources();
   }
 
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index ab6e940..de853f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -60,8 +60,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws IOException {
-    cleanupSparkContexts();
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   //@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4c7b890..2988175 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -23,16 +23,23 @@ import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.slf4j.Logger;
@@ -58,7 +65,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   protected transient ExecutorService executorService;
   protected transient HoodieTableMetaClient metaClient;
   private static AtomicInteger instantGen = new AtomicInteger(1);
-  protected transient HoodieWriteClient client;
+  protected transient HoodieWriteClient writeClient;
+  protected transient HoodieReadClient readClient;
+  protected transient HoodieTableFileSystemView tableView;
+  protected transient HoodieTable hoodieTable;
 
   public String getNextInstant() {
     return String.format("%09d", instantGen.getAndIncrement());
@@ -89,6 +99,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     cleanupSparkContexts();
     cleanupTestDataGenerator();
     cleanupFileSystem();
+    cleanupDFS();
+    cleanupExecutorService();
+    System.gc();
   }
 
   /**
@@ -156,6 +169,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     if (fs != null) {
       LOG.warn("Closing file-system instance used in previous test-run");
       fs.close();
+      fs = null;
     }
   }
 
@@ -175,13 +189,22 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   }
 
   /**
-   * Cleanups table type.
+   * Cleanups hoodie clients.
    */
-  protected void cleanupClients() {
-    metaClient = null;
-    if (null != client) {
-      client.close();
-      client = null;
+  protected void cleanupClients() throws IOException {
+    if (metaClient != null) {
+      metaClient = null;
+    }
+    if (readClient != null) {
+      readClient = null;
+    }
+    if (writeClient != null) {
+      writeClient.close();
+      writeClient = null;
+    }
+    if (tableView != null) {
+      tableView.close();
+      tableView = null;
     }
   }
 
@@ -196,7 +219,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
    * Cleanups test data generator.
    */
   protected void cleanupTestDataGenerator() {
-    dataGen = null;
+    if (dataGen != null) {
+      dataGen = null;
+    }
   }
 
   /**
@@ -272,16 +297,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   }
 
   public HoodieReadClient getHoodieReadClient(String basePath) {
-    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+    return readClient;
   }
 
   public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
       HoodieIndex index) {
-    if (null != client) {
-      client.close();
-      client = null;
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
+    }
+    writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+    return writeClient;
+  }
+
+  public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
+    metaClient = new HoodieTableMetaClient(conf, basePath);
+    return metaClient;
+  }
+
+  public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+      FileStatus[] fileStatuses) {
+    if (tableView == null) {
+      tableView =  new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+    } else {
+      tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
     }
-    client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-    return client;
+    return tableView;
   }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 8fd418a..5a626c7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -52,7 +52,7 @@ public class TestBoundedInMemoryExecutor extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupTestDataGenerator();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index acd2ec1..25f5a59 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -69,8 +69,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupTestDataGenerator();
-    cleanupExecutorService();
+    cleanupResources();
   }
 
   // Test to ensure that we are reading all records from queue iterator in the same order
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index b97fefc..10ae93f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -18,7 +18,26 @@
 
 package org.apache.hudi.index;
 
+<<<<<<< HEAD
 import org.apache.hudi.common.HoodieClientTestHarness;
+=======
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+>>>>>>> e9cab67b... [HUDI-988] Fix More Unit Test Flakiness
 import org.apache.hudi.config.HoodieHBaseIndexConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -31,6 +50,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+
 import static org.junit.Assert.assertTrue;
 
 public class TestHoodieIndex extends HoodieClientTestHarness {
@@ -38,14 +59,12 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   @Before
   public void setUp() throws Exception {
     initSparkContexts("TestHoodieIndex");
-    initPath();
-    initMetaClient();
+    initResources();
   }
 
   @After
-  public void tearDown() {
-    cleanupSparkContexts();
-    cleanupClients();
+  public void tearDown() throws IOException {
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 105b0e8..09b5782 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -105,9 +105,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupSparkContexts();
-    cleanupFileSystem();
-    cleanupClients();
+    cleanupResources();
   }
 
   private HoodieWriteConfig makeConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 55d4526..1065c23 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -78,9 +78,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
   }
 
   @After
-  public void tearDown() {
-    cleanupSparkContexts();
-    cleanupClients();
+  public void tearDown() throws IOException {
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 0972385..3409eea 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -67,8 +67,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
 
   @After
   public void clean() throws IOException {
-    cleanupDFS();
-    cleanupSparkContexts();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 7fd02bc..fe99816 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -65,11 +65,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
-    cleanupTestDataGenerator();
-    cleanupSparkContexts();
-    cleanupClients();
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index cc78a64..f5baf37 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -42,7 +42,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
+    cleanupResources();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 99d9b2b..7f1e538 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload;
@@ -219,7 +219,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
       if (file.getName().endsWith(".parquet")) {
         if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName()))
             && HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()),
-                FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
+            FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
           updatedParquetFile = file;
           break;
         }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index fdc8b27..9111391 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -94,10 +94,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
   @After
   public void clean() throws IOException {
-    cleanupDFS();
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupClients();
+    cleanupResources();
   }
 
   @Test
@@ -167,8 +164,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       client.compact(compactionCommitTime);
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
       // verify that there is a commit
@@ -225,7 +223,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -236,13 +234,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -278,11 +275,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+      List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
       List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
       // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
       assertEquals("Must contain 0 records", 0, recordsRead.size());
@@ -311,7 +308,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // verify there are no errors
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertTrue(commit.isPresent());
       assertEquals("commit should be 001", "001", commit.get().getTimestamp());
@@ -337,11 +334,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      HoodieTableFileSystemView roView =
-          new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
 
       final String absentCommit = newCommitTime;
-      assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
+      assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
     }
   }
 
@@ -366,7 +362,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient =getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -377,13 +373,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView =
+          getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(!dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -399,7 +395,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-        List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
 
@@ -413,7 +409,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         // After rollback, there should be no parquet file with the failed commit time
         Assert.assertEquals(Arrays.stream(allFiles)
             .filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0);
-        dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
       }
@@ -429,7 +425,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
 
-        List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         assertEquals(recordsRead.size(), 200);
 
@@ -449,8 +445,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
         metaClient = HoodieTableMetaClient.reload(metaClient);
         hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
-        roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-        dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+        tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+        dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
         recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
         // check that the number of records read is still correct after rollback operation
         assertEquals(recordsRead.size(), 200);
@@ -476,20 +472,20 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+        tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
         final String compactedCommitTime =
             metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
 
-        assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+        assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
 
         thirdClient.rollback(compactedCommitTime);
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
-        roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+        tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
-        assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+        assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
       }
     }
   }
@@ -513,7 +509,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -524,13 +520,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue("Should list the parquet files we wrote in the delta commit",
           dataFilesToRead.findAny().isPresent());
 
@@ -546,7 +541,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
       copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+      List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
       List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
       assertEquals(recordsRead.size(), 200);
 
@@ -604,12 +599,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
       final String compactedCommitTime =
           metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
 
-      assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+      assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
 
       /**
        * Write 5 (updates)
@@ -631,12 +626,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      roView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
-      dataFilesToRead = roView.getLatestBaseFiles();
+      tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
-      SliceView rtView =
-          new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+      SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       List<HoodieFileGroup> fileGroups =
           ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
       assertTrue(fileGroups.isEmpty());
@@ -678,7 +671,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
       assertNoWriteErrors(statuses);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
       Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -689,13 +682,13 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient,
+      BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
           metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
       Map<String, Long> parquetFileIdToSize =
           dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
 
-      roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
       List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
       assertTrue("Should list the parquet files we wrote in the delta commit",
@@ -723,7 +716,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertFalse(commit.isPresent());
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      roView = new HoodieTableFileSystemView(metaClient,
+      roView = getHoodieTableFileSystemView(metaClient,
           hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
       List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
@@ -752,7 +745,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
 
       newCommitTime = "101";
       writeClient.startCommitWithTime(newCommitTime);
@@ -887,7 +880,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
       // and calling rollback twice
       final String lastCommitTime = newCommitTime;
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
           .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
       String fileName = last.getFileName();
@@ -980,7 +973,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      HoodieTableMetaClient metaClient = getHoodieMetaClient(jsc.hadoopConfiguration(), basePath);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
       // Create a commit without rolling stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
@@ -1080,7 +1073,6 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
       Map<String, Long> fileIdToInsertsMap = new HashMap<>();
       Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index 1a366a4..2a19d2c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -542,7 +542,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
   private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
     FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
     HoodieTableFileSystemView view =
-        new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
     return view.getLatestBaseFiles().collect(Collectors.toList());
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 86a2e1f..3e36d43 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -75,10 +75,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
   @After
   public void tearDown() throws Exception {
-    cleanupFileSystem();
-    cleanupTestDataGenerator();
-    cleanupSparkContexts();
-    cleanupClients();
+    cleanupResources();
   }
 
   private HoodieWriteConfig getConfig() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 1f7165b..3ba021b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -90,6 +90,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
     super.init(metaClient, visibleActiveTimeline);
   }
 
+  public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+      FileStatus[] fileStatuses) {
+    init(metaClient, visibleActiveTimeline);
+    addFilesToView(fileStatuses);
+  }
+
   @Override
   protected void resetViewState() {
     this.fgIdToPendingCompaction = null;
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index 5538d66..4410193 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -117,7 +117,7 @@ public class FileSystemViewHandler {
       synchronized (view) {
         if (isLocalViewBehind(ctx)) {
           HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
-          LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+          LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
               + " as last known instant but server has the folling timeline :"
               + localTimeline.getInstants().collect(Collectors.toList()));
           view.sync();


[hudi] 02/04: [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ae48ecbe232eb55267d1a138baeec13baa1fb249
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Wed Jun 3 00:35:14 2020 -0700

    [HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions
---
 .../client/embedded/EmbeddedTimelineService.java    |  4 +++-
 .../apache/hudi/table/HoodieCopyOnWriteTable.java   |  2 ++
 .../apache/hudi/table/HoodieMergeOnReadTable.java   |  2 ++
 .../org/apache/hudi/table/TestMergeOnReadTable.java |  3 +++
 .../table/timeline/HoodieDefaultTimeline.java       |  2 +-
 .../table/view/FileSystemViewStorageConfig.java     | 21 +++++++++++++++++++++
 6 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 5afee3f..c7c4f7b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -89,7 +89,9 @@ public class EmbeddedTimelineService {
    * Retrieves proper view storage configs for remote clients to access this service.
    */
   public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
-    return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
+    FileSystemViewStorageType viewStorageType = config.shouldEnableBackupForRemoteFileSystemView()
+            ? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
+    return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType)
         .withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
   }
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 4c91c77..c74af2d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -359,6 +359,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     if (instant.isCompleted()) {
       LOG.info("Unpublishing instant " + instant);
       instant = activeTimeline.revertToInflight(instant);
+      // reload meta-client to reflect latest timeline status
+      metaClient.reloadActiveTimeline();
     }
 
     // For Requested State (like failure during index lookup), there is nothing to do rollback other than
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 938a5fd..5f56369 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -179,6 +179,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     if (instant.isCompleted()) {
       LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
       instant = this.getActiveTimeline().revertToInflight(instant);
+      // reload meta-client to reflect latest timeline status
+      metaClient.reloadActiveTimeline();
     }
 
     List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index fdc968d..9f3eaea 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.TableFileSystemView.SliceView;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -1219,6 +1220,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 86431c9..6451749 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -107,7 +107,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
 
   @Override
   public HoodieTimeline filterCompletedAndCompactionInstants() {
-    return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()
+    return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
             || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 93c5507..d805dfb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -55,6 +55,15 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
   private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
   private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
 
+  /**
+   * Configs to control whether backup needs to be configured if clients were not able to reach
+   * timeline service.
+   */
+  public static final String REMOTE_BACKUP_VIEW_HANDLER_ENABLE =
+      "hoodie.filesystem.remote.backup.view.enable";
+  // Need to be disabled only for tests.
+  public static final String DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE = "true";
+
   public static FileSystemViewStorageConfig.Builder newBuilder() {
     return new Builder();
   }
@@ -98,6 +107,10 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
     return FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
   }
 
+  public boolean shouldEnableBackupForRemoteFileSystemView() {
+    return Boolean.parseBoolean(props.getProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE));
+  }
+
   public String getRocksdbBasePath() {
     return props.getProperty(ROCKSDB_BASE_PATH_PROP);
   }
@@ -166,6 +179,11 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withEnableBackupForRemoteFileSystemView(boolean enable) {
+      props.setProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE, Boolean.toString(enable));
+      return this;
+    }
+
     public FileSystemViewStorageConfig build() {
       setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_STORAGE_TYPE), FILESYSTEM_VIEW_STORAGE_TYPE,
           DEFAULT_VIEW_STORAGE_TYPE.name());
@@ -188,6 +206,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
           DEFAULT_ROCKSDB_BASE_PATH);
 
+      setDefaultOnCondition(props, !props.containsKey(REMOTE_BACKUP_VIEW_HANDLER_ENABLE),
+          REMOTE_BACKUP_VIEW_HANDLER_ENABLE, DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE);
+
       // Validations
       FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE));
       FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));