You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/06/08 12:52:56 UTC

[hudi] 03/04: Making few fixes after cherry picking

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d3afcbac3a6e5362d57570a2a5807abbf65c69d8
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun Jun 7 16:23:40 2020 -0400

    Making few fixes after cherry picking
---
 .../apache/hudi/client/TestHoodieClientBase.java   | 917 +++++++++++----------
 .../hudi/common/HoodieClientTestHarness.java       | 426 +++++-----
 .../apache/hudi/table/TestMergeOnReadTable.java    |   2 +
 .../hudi/table/compact/TestHoodieCompactor.java    |   6 +-
 .../table/string/TestHoodieActiveTimeline.java     |   2 +-
 5 files changed, 678 insertions(+), 675 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 6e6458b..6856489 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -72,477 +72,478 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHoodieClientBase extends HoodieClientTestHarness {
 
-    private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
-    @Before
-    public void setUp() throws Exception {
-        initResources();
+  private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+  @Before
+  public void setUp() throws Exception {
+    initResources();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+    return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Get Default HoodieWriteConfig for tests.
+   *
+   * @return Default Hoodie Write Config for tests
+   */
+  protected HoodieWriteConfig getConfig() {
+    return getConfigBuilder().build();
+  }
+
+  protected HoodieWriteConfig getConfig(IndexType indexType) {
+    return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  protected HoodieWriteConfig.Builder getConfigBuilder() {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return getConfigBuilder(schemaStr, IndexType.BLOOM);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    ((SyncableFileSystemView) (table.getSliceView())).reset();
+    return table;
+  }
+
+  /**
+   * Assert no failures in writing hoodie files.
+   *
+   * @param statuses List of Write Status
+   */
+  public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+    // Verify there are no errors
+    for (WriteStatus status : statuses) {
+      assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
     }
-
-    @After
-    public void tearDown() throws Exception {
-        cleanupResources();
+  }
+
+  void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputRecords.stream()
+        .map(HoodieRecord::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+    Set<String> partitionPathSet = inputKeys.stream()
+        .map(HoodieKey::getPartitionPath)
+        .collect(Collectors.toSet());
+    assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+  }
+
+  /**
+   * Ensure presence of partition meta-data at known depth.
+   *
+   * @param partitionPaths Partition paths to check
+   * @param fs File System
+   * @throws IOException in case of error
+   */
+  void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+    for (String partitionPath : partitionPaths) {
+      assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+      HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+      pmeta.readFromFS();
+      Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
     }
-
-    protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
-        return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+  }
+
+  /**
+   * Ensure records have location field set.
+   *
+   * @param taggedRecords Tagged Records
+   * @param commitTime Commit Timestamp
+   */
+  protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+    for (HoodieRecord rec : taggedRecords) {
+      assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+      assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+          rec.getCurrentLocation().getInstantTime(), commitTime);
     }
-
-    /**
-     * Get Default HoodieWriteConfig for tests.
-     *
-     * @return Default Hoodie Write Config for tests
-     */
-    protected HoodieWriteConfig getConfig() {
-        return getConfigBuilder().build();
+  }
+
+  /**
+   * Assert that there is no duplicate key at the partition level.
+   *
+   * @param records List of Hoodie records
+   */
+  void assertNodupesWithinPartition(List<HoodieRecord> records) {
+    Map<String, Set<String>> partitionToKeys = new HashMap<>();
+    for (HoodieRecord r : records) {
+      String key = r.getRecordKey();
+      String partitionPath = r.getPartitionPath();
+      if (!partitionToKeys.containsKey(partitionPath)) {
+        partitionToKeys.put(partitionPath, new HashSet<>());
+      }
+      assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+      partitionToKeys.get(partitionPath).add(key);
     }
-
-    protected HoodieWriteConfig getConfig(IndexType indexType) {
-        return getConfigBuilder(indexType).build();
+  }
+
+  /**
+   * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param recordGenFunction Records Generation function
+   * @return Wrapped function
+   */
+  private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+    return (commit, numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+      return taggedRecords.collect();
+    };
+  }
+
+  /**
+   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+   * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param keyGenFunction Keys Generation function
+   * @return Wrapped function
+   */
+  private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+    return (numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieKey> records = keyGenFunction.apply(numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+      return taggedRecords.map(record -> record.getKey()).collect();
+    };
+  }
+
+  /**
+   * Generate wrapper for record generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig,
+      Function2<List<HoodieRecord>, String, Integer> wrapped) {
+    if (isPreppedAPI) {
+      return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    protected HoodieWriteConfig.Builder getConfigBuilder() {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Generate wrapper for delete key generation function for testing Prepped APIs.
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+    if (isPreppedAPI) {
+      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
     }
-
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
-        return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+  }
+
+  /**
+   * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for insertion
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+  }
+
+  /**
+   * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param writeFn Write Function to be used for upsert
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+    return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+        numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+        expTotalCommits);
+  }
+
+  /**
+   * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param deleteFn Delete Function to be used for deletes
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+    final Function<Integer, List<HoodieKey>> keyGenFunction =
+        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+        keyGenFunction,
+        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+  }
+
+  /**
+   * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param recordGenFunction Records Generation Function
+   * @param writeFn Write Function to be used for upsert
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @param expTotalCommits Expected number of commits (including this commit)
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+      Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+    // Write 1 (only inserts)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForRecords(records, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+      if (commitTimesBetweenPrevAndNew.isPresent()) {
+        commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+          assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+              HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+              HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+        });
+      }
     }
-
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-        return getConfigBuilder(schemaStr, IndexType.BLOOM);
+    return result;
+  }
+
+  /**
+   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param keyGenFunction Key Generation function
+   * @param deleteFn Write Function to be used for delete
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function<Integer, List<HoodieKey>> keyGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+    // Delete 1 (only deletes)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadataForKeys(keysToDelete, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting 3 commits.", 3,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+              + " since it is a delete operation",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
     }
+    return result;
+  }
 
-    /**
-     * Get Config builder with default configs set.
-     *
-     * @return Config Builder
-     */
-    HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
-        return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
-            .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-            .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
-            .withWriteStatusClass(MetadataMergeWriteStatus.class)
-            .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
-            .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
-            .forTable("test-trip-table")
-            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
-            .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-                .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
-    }
+  /**
+   * Get Cleaner state corresponding to a partition path.
+   *
+   * @param hoodieCleanStatsTwo List of Clean Stats
+   * @param partitionPath Partition path for filtering
+   * @return Cleaner state corresponding to partition path
+   */
+  protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+    return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+  }
 
-    protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        ((SyncableFileSystemView) (table.getSliceView())).reset();
-        return table;
-    }
+  // Functional Interfaces for passing lambda and Hoodie Write API contexts
 
-    /**
-     * Assert no failures in writing hoodie files.
-     *
-     * @param statuses List of Write Status
-     */
-    public static void assertNoWriteErrors(List<WriteStatus> statuses) {
-        // Verify there are no errors
-        for (WriteStatus status : statuses) {
-            assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
-        }
-    }
+  @FunctionalInterface
+  public interface Function2<R, T1, T2> {
 
-    void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputRecords.stream()
-            .map(HoodieRecord::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
+    R apply(T1 v1, T2 v2) throws IOException;
+  }
 
-    void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
-        Set<String> partitionPathSet = inputKeys.stream()
-            .map(HoodieKey::getPartitionPath)
-            .collect(Collectors.toSet());
-        assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
-    }
-
-    /**
-     * Ensure presence of partition meta-data at known depth.
-     *
-     * @param partitionPaths Partition paths to check
-     * @param fs File System
-     * @throws IOException in case of error
-     */
-    void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
-        for (String partitionPath : partitionPaths) {
-            assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
-            HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
-            pmeta.readFromFS();
-            Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
-        }
-    }
-
-    /**
-     * Ensure records have location field set.
-     *
-     * @param taggedRecords Tagged Records
-     * @param commitTime Commit Timestamp
-     */
-    protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
-        for (HoodieRecord rec : taggedRecords) {
-            assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
-            assertEquals("All records should have commit time " + commitTime + ", since updates were made",
-                rec.getCurrentLocation().getInstantTime(), commitTime);
-        }
-    }
-
-    /**
-     * Assert that there is no duplicate key at the partition level.
-     *
-     * @param records List of Hoodie records
-     */
-    void assertNodupesWithinPartition(List<HoodieRecord> records) {
-        Map<String, Set<String>> partitionToKeys = new HashMap<>();
-        for (HoodieRecord r : records) {
-            String key = r.getRecordKey();
-            String partitionPath = r.getPartitionPath();
-            if (!partitionToKeys.containsKey(partitionPath)) {
-                partitionToKeys.put(partitionPath, new HashSet<>());
-            }
-            assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
-            partitionToKeys.get(partitionPath).add(key);
-        }
-    }
-
-    /**
-     * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by record-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param recordGenFunction Records Generation function
-     * @return Wrapped function
-     */
-    private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
-        return (commit, numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
-            return taggedRecords.collect();
-        };
-    }
-
-    /**
-     * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
-     * record-location setting. Uniqueness is guaranteed by key-generation function itself.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param keyGenFunction Keys Generation function
-     * @return Wrapped function
-     */
-    private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
-        final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
-        return (numRecords) -> {
-            final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
-            List<HoodieKey> records = keyGenFunction.apply(numRecords);
-            final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-            HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
-            JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
-                .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
-            JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
-            return taggedRecords.map(record -> record.getKey()).collect();
-        };
-    }
+  @FunctionalInterface
+  public interface Function3<R, T1, T2, T3> {
 
-    /**
-     * Generate wrapper for record generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig,
-        Function2<List<HoodieRecord>, String, Integer> wrapped) {
-        if (isPreppedAPI) {
-            return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Generate wrapper for delete key generation function for testing Prepped APIs.
-     *
-     * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
-     * @param writeConfig Hoodie Write Config
-     * @param wrapped Actual Records Generation function
-     * @return Wrapped Function
-     */
-    Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
-        HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
-        if (isPreppedAPI) {
-            return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
-        } else {
-            return wrapped;
-        }
-    }
-
-    /**
-     * Helper to insert first batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for insertion
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
-        return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
-            recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
-    }
-
-    /**
-     * Helper to upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param writeFn Write Function to be used for upsert
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-        final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
-            generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
-        return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
-            numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
-            expTotalCommits);
-    }
-
-    /**
-     * Helper to delete batch of keys and do regular assertions on the state after successful completion.
-     *
-     * @param writeConfig Hoodie Write Config
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param deleteFn Delete Function to be used for deletes
-     * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @return RDD of write-status
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
-        String prevCommitTime, String initCommitTime,
-        int numRecordsInThisCommit,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-        final Function<Integer, List<HoodieKey>> keyGenFunction =
-            generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
-        return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
-            keyGenFunction,
-            deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
-    }
-
-    /**
-     * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param numRecordsInThisCommit Number of records to be added in the new commit
-     * @param recordGenFunction Records Generation Function
-     * @param writeFn Write Function to be used for upsert
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @param expTotalCommits Expected number of commits (including this commit)
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
-        Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
-        // Write 1 (only inserts)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
-        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-        JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForRecords(records, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-            if (commitTimesBetweenPrevAndNew.isPresent()) {
-                commitTimesBetweenPrevAndNew.get().forEach(ct -> {
-                    assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
-                        HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                        HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
-                });
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
-     *
-     * @param client Hoodie Write Client
-     * @param newCommitTime New Commit Timestamp to be used
-     * @param prevCommitTime Commit Timestamp used in previous commit
-     * @param initCommitTime Begin Timestamp (usually "000")
-     * @param keyGenFunction Key Generation function
-     * @param deleteFn Write Function to be used for delete
-     * @param assertForCommit Enable Assertion of Writes
-     * @param expRecordsInThisCommit Expected number of records in this commit
-     * @param expTotalRecords Expected number of records when scanned
-     * @throws Exception in case of error
-     */
-    JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
-        String initCommitTime, int numRecordsInThisCommit,
-        Function<Integer, List<HoodieKey>> keyGenFunction,
-        Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
-        boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
-        // Delete 1 (only deletes)
-        client.startCommitWithTime(newCommitTime);
-
-        List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
-        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
-        JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
-        List<WriteStatus> statuses = result.collect();
-        assertNoWriteErrors(statuses);
-
-        // check the partition metadata is written out
-        assertPartitionMetadataForKeys(keysToDelete, fs);
-
-        // verify that there is a commit
-        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
-        if (assertForCommit) {
-            assertEquals("Expecting 3 commits.", 3,
-                timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
-            Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
-                timeline.lastInstant().get().getTimestamp());
-            assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
-            // Check the entire dataset has all records still
-            String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-            for (int i = 0; i < fullPartitionPaths.length; i++) {
-                fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
-            }
-            assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
-                HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
-            // Check that the incremental consumption from prevCommitTime
-            assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
-                    + " since it is a delete operation",
-                HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
-                HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
-        }
-        return result;
-    }
-
-    /**
-     * Get Cleaner state corresponding to a partition path.
-     *
-     * @param hoodieCleanStatsTwo List of Clean Stats
-     * @param partitionPath Partition path for filtering
-     * @return Cleaner state corresponding to partition path
-     */
-    protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
-        return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
-    }
-
-    // Functional Interfaces for passing lambda and Hoodie Write API contexts
-
-    @FunctionalInterface
-    public interface Function2<R, T1, T2> {
-
-        R apply(T1 v1, T2 v2) throws IOException;
-    }
-
-    @FunctionalInterface
-    public interface Function3<R, T1, T2, T3> {
-
-        R apply(T1 v1, T2 v2, T3 v3) throws IOException;
-    }
+    R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+  }
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index e4202f0..4c7b890 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -49,239 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
-    protected transient JavaSparkContext jsc = null;
-    protected transient SQLContext sqlContext;
-    protected transient FileSystem fs;
-    protected transient HoodieTestDataGenerator dataGen = null;
-    protected transient ExecutorService executorService;
-    protected transient HoodieTableMetaClient metaClient;
-    private static AtomicInteger instantGen = new AtomicInteger(1);
-    protected transient HoodieWriteClient client;
-
-    public String getNextInstant() {
-        return String.format("%09d", instantGen.getAndIncrement());
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+  protected transient JavaSparkContext jsc = null;
+  protected transient SQLContext sqlContext;
+  protected transient FileSystem fs;
+  protected transient HoodieTestDataGenerator dataGen = null;
+  protected transient ExecutorService executorService;
+  protected transient HoodieTableMetaClient metaClient;
+  private static AtomicInteger instantGen = new AtomicInteger(1);
+  protected transient HoodieWriteClient client;
+
+  public String getNextInstant() {
+    return String.format("%09d", instantGen.getAndIncrement());
+  }
+
+  // dfs
+  protected String dfsBasePath;
+  protected transient HdfsTestService hdfsTestService;
+  protected transient MiniDFSCluster dfsCluster;
+  protected transient DistributedFileSystem dfs;
+
+  /**
+   * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void initResources() throws IOException {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initFileSystem();
+    initMetaClient();
+  }
+
+  /**
+   * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+   */
+  public void cleanupResources() throws IOException {
+    cleanupClients();
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    cleanupFileSystem();
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+   *
+   * @param appName The specified application name.
+   */
+  protected void initSparkContexts(String appName) {
+    // Initialize a local spark env
+    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+    jsc.setLogLevel("ERROR");
+
+    // SQLContext stuff
+    sqlContext = new SQLContext(jsc);
+  }
+
+  /**
+   * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+   * <b>TestHoodieClient</b>.
+   */
+  protected void initSparkContexts() {
+    initSparkContexts("TestHoodieClient");
+  }
+
+  /**
+   * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+   */
+  protected void cleanupSparkContexts() {
+    if (sqlContext != null) {
+      LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+      sqlContext.clearCache();
+      sqlContext = null;
     }
 
-    // dfs
-    protected String dfsBasePath;
-    protected transient HdfsTestService hdfsTestService;
-    protected transient MiniDFSCluster dfsCluster;
-    protected transient DistributedFileSystem dfs;
-
-    /**
-     * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void initResources() throws IOException {
-        initPath();
-        initSparkContexts();
-        initTestDataGenerator();
-        initFileSystem();
-        initMetaClient();
+    if (jsc != null) {
+      LOG.info("Closing spark context used in previous test-case");
+      jsc.close();
+      jsc.stop();
+      jsc = null;
     }
-
-    /**
-     * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
-     */
-    public void cleanupResources() throws IOException {
-        cleanupClients();
-        cleanupSparkContexts();
-        cleanupTestDataGenerator();
-        cleanupFileSystem();
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
-     *
-     * @param appName The specified application name.
-     */
-    protected void initSparkContexts(String appName) {
-        // Initialize a local spark env
-        jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
-        jsc.setLogLevel("ERROR");
-
-        // SQLContext stuff
-        sqlContext = new SQLContext(jsc);
-    }
-
-    /**
-     * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
-     * <b>TestHoodieClient</b>.
-     */
-    protected void initSparkContexts() {
-        initSparkContexts("TestHoodieClient");
-    }
-
-    /**
-     * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
-     */
-    protected void cleanupSparkContexts() {
-        if (sqlContext != null) {
-            LOG.info("Clearing sql context cache of spark-session used in previous test-case");
-            sqlContext.clearCache();
-            sqlContext = null;
-        }
-
-        if (jsc != null) {
-            LOG.info("Closing spark context used in previous test-case");
-            jsc.close();
-            jsc.stop();
-            jsc = null;
-        }
-    }
-
-    /**
-     * Initializes a file system with the hadoop configuration of Spark context.
-     */
-    protected void initFileSystem() {
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        initFileSystemWithConfiguration(jsc.hadoopConfiguration());
-    }
-
-    /**
-     * Initializes file system with a default empty configuration.
-     */
-    protected void initFileSystemWithDefaultConfiguration() {
-        initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Initializes a file system with the hadoop configuration of Spark context.
+   */
+  protected void initFileSystem() {
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Cleanups file system.
-     */
-    protected void cleanupFileSystem() throws IOException {
-        if (fs != null) {
-            LOG.warn("Closing file-system instance used in previous test-run");
-            fs.close();
-        }
+    initFileSystemWithConfiguration(jsc.hadoopConfiguration());
+  }
+
+  /**
+   * Initializes file system with a default empty configuration.
+   */
+  protected void initFileSystemWithDefaultConfiguration() {
+    initFileSystemWithConfiguration(new Configuration());
+  }
+
+  /**
+   * Cleanups file system.
+   */
+  protected void cleanupFileSystem() throws IOException {
+    if (fs != null) {
+      LOG.warn("Closing file-system instance used in previous test-run");
+      fs.close();
     }
-
-    /**
-     * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
-     */
-    protected void initMetaClient() throws IOException {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        if (jsc == null) {
-            throw new IllegalStateException("The Spark context has not been initialized.");
-        }
-
-        metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
-    }
-
-    /**
-     * Cleanups table type.
-     */
-    protected void cleanupClients() {
-        metaClient = null;
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-    }
-
-    /**
-     * Initializes a test data generator which used to generate test datas.
-     */
-    protected void initTestDataGenerator() {
-        dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+   */
+  protected void initMetaClient() throws IOException {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    /**
-     * Cleanups test data generator.
-     */
-    protected void cleanupTestDataGenerator() {
-        dataGen = null;
+    if (jsc == null) {
+      throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    /**
-     * Initializes a distributed file system and base directory.
-     */
-    protected void initDFS() throws IOException {
-        FileSystem.closeAll();
-        hdfsTestService = new HdfsTestService();
-        dfsCluster = hdfsTestService.start(true);
-
-        // Create a temp folder as the base path
-        dfs = dfsCluster.getFileSystem();
-        dfsBasePath = dfs.getWorkingDirectory().toString();
-        dfs.mkdirs(new Path(dfsBasePath));
+    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+  }
+
+  /**
+   * Cleanups table type.
+   */
+  protected void cleanupClients() {
+    metaClient = null;
+    if (null != client) {
+      client.close();
+      client = null;
     }
-
-    /**
-     * Cleanups the distributed file system.
-     */
-    protected void cleanupDFS() throws IOException {
-        if (hdfsTestService != null) {
-            hdfsTestService.stop();
-            dfsCluster.shutdown();
-            hdfsTestService = null;
-            dfsCluster = null;
-            dfs = null;
-        }
-        // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-        // same JVM
-        FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes a test data generator which used to generate test datas.
+   */
+  protected void initTestDataGenerator() {
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  /**
+   * Cleanups test data generator.
+   */
+  protected void cleanupTestDataGenerator() {
+    dataGen = null;
+  }
+
+  /**
+   * Initializes a distributed file system and base directory.
+   */
+  protected void initDFS() throws IOException {
+    FileSystem.closeAll();
+    hdfsTestService = new HdfsTestService();
+    dfsCluster = hdfsTestService.start(true);
+
+    // Create a temp folder as the base path
+    dfs = dfsCluster.getFileSystem();
+    dfsBasePath = dfs.getWorkingDirectory().toString();
+    dfs.mkdirs(new Path(dfsBasePath));
+  }
+
+  /**
+   * Cleanups the distributed file system.
+   */
+  protected void cleanupDFS() throws IOException {
+    if (hdfsTestService != null) {
+      hdfsTestService.stop();
+      dfsCluster.shutdown();
+      hdfsTestService = null;
+      dfsCluster = null;
+      dfs = null;
     }
-
-    /**
-     * Initializes executor service with a fixed thread pool.
-     *
-     * @param threadNum specify the capacity of the fixed thread pool
-     */
-    protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
-        executorService = Executors.newFixedThreadPool(threadNum);
+    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+    // same JVM
+    FileSystem.closeAll();
+  }
+
+  /**
+   * Initializes executor service with a fixed thread pool.
+   *
+   * @param threadNum specify the capacity of the fixed thread pool
+   */
+  protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+    executorService = Executors.newFixedThreadPool(threadNum);
+  }
+
+  /**
+   * Cleanups the executor service.
+   */
+  protected void cleanupExecutorService() {
+    if (this.executorService != null) {
+      this.executorService.shutdownNow();
+      this.executorService = null;
     }
+  }
 
-    /**
-     * Cleanups the executor service.
-     */
-    protected void cleanupExecutorService() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-            this.executorService = null;
-        }
+  private void initFileSystemWithConfiguration(Configuration configuration) {
+    if (basePath == null) {
+      throw new IllegalStateException("The base path has not been initialized.");
     }
 
-    private void initFileSystemWithConfiguration(Configuration configuration) {
-        if (basePath == null) {
-            throw new IllegalStateException("The base path has not been initialized.");
-        }
-
-        fs = FSUtils.getFs(basePath, configuration);
-        if (fs instanceof LocalFileSystem) {
-            LocalFileSystem lfs = (LocalFileSystem) fs;
-            // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
-            // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
-            // So, for the tests, we enforce checksum verification to circumvent the problem
-            lfs.setVerifyChecksum(true);
-        }
+    fs = FSUtils.getFs(basePath, configuration);
+    if (fs instanceof LocalFileSystem) {
+      LocalFileSystem lfs = (LocalFileSystem) fs;
+      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+      // So, for the tests, we enforce checksum verification to circumvent the problem
+      lfs.setVerifyChecksum(true);
     }
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
-        return getHoodieWriteClient(cfg, false);
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return getHoodieWriteClient(cfg, false);
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
-        return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
-    }
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+    return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+  }
 
-    public HoodieReadClient getHoodieReadClient(String basePath) {
-        return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
-    }
+  public HoodieReadClient getHoodieReadClient(String basePath) {
+    return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+  }
 
-    public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
-        HoodieIndex index) {
-        if (null != client) {
-            client.close();
-            client = null;
-        }
-        client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
-        return client;
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+      HoodieIndex index) {
+    if (null != client) {
+      client.close();
+      client = null;
     }
+    client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+    return client;
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 9f3eaea..fdc8b27 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -655,6 +655,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withEmbeddedTimelineServerEnabled(true)
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
         .build();
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 09d62a7..86a2e1f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.table.compact;
 
-import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -129,7 +129,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
-     String newCommitTime = "100";
+      String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
index d77392f..b2a7f83 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -385,7 +385,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     // filterCompletedAndCompactionInstants
     // This cannot be done using checkFilter as it involves both states and actions
     final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
-    final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
+    final Set<State> states = Sets.newHashSet(State.COMPLETED);
     final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
     sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
         .forEach(i -> assertTrue(t1.containsInstant(i)));