You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/09/21 15:36:59 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3695: [HUDI-2395] Metadata tests rewrite

nsivabalan commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713091403



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
##########
@@ -418,4 +445,166 @@ public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaCli
     }
     return Pair.of(partitionPathStatMap, globalStat);
   }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the file system.
+   */
+  public void validateMetadata(HoodieTestTable testTable, List<String> inflightCommits, HoodieWriteConfig writeConfig, String metadataTableBasePath) throws IOException {
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertNotNull(tableMetadata, "MetadataReader should have been initialized");
+    if (!writeConfig.isMetadataTableEnabled() || !writeConfig.getMetadataConfig().validateFileListingMetadata()) {
+      return;
+    }
+
+    assertEquals(inflightCommits, testTable.inflightCommits());
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Partitions should match
+    List<java.nio.file.Path> fsPartitionPaths = testTable.getAllPartitionPaths();
+    List<String> fsPartitions = new ArrayList<>();
+    fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString()));
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
+    assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
+    Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+    fsPartitions.forEach(partition -> {
+      try {
+        Path partitionPath;
+        if (partition.equals("")) {
+          // Should be the non-partitioned case
+          partitionPath = new Path(basePath);
+        } else {
+          partitionPath = new Path(basePath, partition);
+        }
+
+        FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition);
+        FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
+        List<String> fsFileNames = Arrays.stream(fsStatuses)
+            .map(s -> s.getPath().getName()).collect(Collectors.toList());
+        List<String> metadataFilenames = Arrays.stream(metaStatuses)
+            .map(s -> s.getPath().getName()).collect(Collectors.toList());
+        Collections.sort(fsFileNames);
+        Collections.sort(metadataFilenames);
+
+        assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
+
+        if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
+          LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
+          LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
+
+          for (String fileName : fsFileNames) {
+            if (!metadataFilenames.contains(fileName)) {
+              LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
+            }
+          }
+          for (String fileName : metadataFilenames) {
+            if (!fsFileNames.contains(fileName)) {
+              LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
+            }
+          }
+        }
+
+        // Block sizes should be valid
+        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
+        List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+        List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+        assertEquals(fsBlockSizes, metadataBlockSizes);
+
+        assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
+        assertEquals(fsFileNames, metadataFilenames, "Files within partition " + partition + " should match");
+
+        // FileSystemView should expose the same data
+        List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
+        fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
+
+        fileGroups.forEach(g -> LogManager.getLogger(getClass()).info(g));
+        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(getClass()).info(b)));
+        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(getClass()).info(s)));
+
+        long numFiles = fileGroups.stream()
+            .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
+            .sum();
+        assertEquals(metadataFilenames.size(), numFiles);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+        fail("Exception should not be raised: " + e);
+      }
+    });
+
+    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig);

Review comment:
       you can probably add another argument to this method whether to do detailed validation or not. and add lines 549 to 584 within the true condition. For every tests, we can do detailed just once at the end of it. but after very operation, if we verify 453 to 545, would suffice. 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -199,6 +268,31 @@ public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPla
     return this;
   }
 
+  public HoodieTestTable addClean(String instantTime) throws IOException {
+    HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(),
+        CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+    HoodieCleanStat cleanStats = new HoodieCleanStat(
+        HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
+        HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],

Review comment:
       declare random static variable and re-use rather than instantiating new Random() everytime.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -421,16 +593,329 @@ public String getBaseFileNameById(String fileId) {
   }
 
   public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
+        .filter(entry -> {
+          boolean toReturn = true;
+          String fileName = entry.getPath().getName();
+          if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            toReturn = false;
+          } else {
+            for (String inflight : inflightCommits) {
+              if (fileName.contains(inflight)) {
+                toReturn = false;
+                break;
+              }
+            }
+          }
+          return toReturn;
+        }).toArray(FileStatus[]::new);
   }
 
   public FileStatus[] listAllFilesInTempFolder() throws IOException {
     return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
   }
 
+  public void deleteFilesInPartition(String partitionPath, List<String> filesToDelete) throws IOException {
+    FileStatus[] allFiles = listAllFilesInPartition(partitionPath);
+    Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> {
+      try {
+        Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName()));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+  }
+
+  public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime) throws Exception {
+    Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTimeToRollback);
+    if (!commitMetadata.isPresent()) {
+      throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback);
+    }
+    Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
+    HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles);
+    for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    return addRollback(commitTime, rollbackMetadata);
+  }
+
+  public HoodieTestTable doCluster(String commitTime, Map<String, List<String>> partitionToReplaceFileIds) throws Exception {
+    Map<String, List<Pair<String, Integer>>> partitionToReplaceFileIdsWithLength = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : partitionToReplaceFileIds.entrySet()) {
+      String partition = entry.getKey();
+      partitionToReplaceFileIdsWithLength.put(entry.getKey(), new ArrayList<>());
+      for (String fileId : entry.getValue()) {
+        int length = 100 + RANDOM.nextInt(500);
+        partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, length));
+      }
+    }
+    List<HoodieWriteStat> writeStats = generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, commitTime, false);
+    HoodieReplaceCommitMetadata replaceMetadata =
+        (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, REPLACE_COMMIT_ACTION);
+    return addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata);
+  }
+
+  public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> partitionFileCountsToDelete) throws IOException {
+    Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+    for (Map.Entry<String, Integer> entry : partitionFileCountsToDelete.entrySet()) {
+      partitionFilesToDelete.put(entry.getKey(), getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
+    }
+    HoodieTestTableState testTableState = new HoodieTestTableState();
+    for (Map.Entry<String, List<String>> entry : partitionFilesToDelete.entrySet()) {
+      testTableState = testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), entry.getValue());
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta = getHoodieCleanMetadata(commitTime, testTableState);
+    addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
+    return cleanerMeta.getValue();
+  }
+
+  public HoodieCleanMetadata doClean(String cleanCommitTime, List<String> commitsToClean) throws IOException {

Review comment:
       can we name this method as doCleanBasedonCommits to be explicit. 

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -264,344 +156,109 @@ public void testOnlyValidPartitionsAdded() throws Exception {
     final String filteredDirectoryThree = ".backups";
 
     // Create some commits
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree)
         .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
         .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10);
 
-    final HoodieWriteConfig writeConfig =
-            getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
+    writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
-      client.startCommitWithTime("005");
-      client.insert(jsc.emptyRDD(), "005");
-
-      List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
-      assertFalse(partitions.contains(nonPartitionDirectory),
-          "Must not contain the non-partition " + nonPartitionDirectory);
-      assertTrue(partitions.contains("p1"), "Must contain partition p1");
-      assertTrue(partitions.contains("p2"), "Must contain partition p2");
-
-      assertFalse(partitions.contains(filteredDirectoryOne),
-          "Must not contain the filtered directory " + filteredDirectoryOne);
-      assertFalse(partitions.contains(filteredDirectoryTwo),
-          "Must not contain the filtered directory " + filteredDirectoryTwo);
-      assertFalse(partitions.contains(filteredDirectoryThree),
-          "Must not contain the filtered directory " + filteredDirectoryThree);
-
-      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
-      assertEquals(2, statuses.length);
-      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
-      assertEquals(5, statuses.length);
-      Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
-          Arrays.asList(basePath + "/p1", basePath + "/p2"));
-      assertEquals(2, partitionsToFilesMap.size());
-      assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
-      assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
-    }
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
+    syncTableMetadata(writeConfig);
+
+    List<String> partitions = metadataWriter(writeConfig).metadata().getAllPartitionPaths();
+    assertFalse(partitions.contains(nonPartitionDirectory),
+        "Must not contain the non-partition " + nonPartitionDirectory);
+    assertTrue(partitions.contains("p1"), "Must contain partition p1");
+    assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+    assertFalse(partitions.contains(filteredDirectoryOne),
+        "Must not contain the filtered directory " + filteredDirectoryOne);
+    assertFalse(partitions.contains(filteredDirectoryTwo),
+        "Must not contain the filtered directory " + filteredDirectoryTwo);
+    assertFalse(partitions.contains(filteredDirectoryThree),
+        "Must not contain the filtered directory " + filteredDirectoryThree);
+
+    FileStatus[] statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p1"));
+    assertEquals(3, statuses.length);
+    statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p2"));
+    assertEquals(6, statuses.length);
+    Map<String, FileStatus[]> partitionsToFilesMap = metadata(writeConfig, context).getAllFilesInPartitions(
+        Arrays.asList(basePath + "/p1", basePath + "/p2"));
+    assertEquals(2, partitionsToFilesMap.size());
+    assertEquals(3, partitionsToFilesMap.get(basePath + "/p1").length);
+    assertEquals(6, partitionsToFilesMap.get(basePath + "/p2").length);
   }
 
   /**
    * Test various table operations sync to Metadata Table correctly.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperations(HoodieTableType tableType) throws Exception {
+  @MethodSource("bootstrapAndTableOperationTestArgs")
+  public void testTableOperations(HoodieTableType tableType, boolean doNotSyncFewCommits) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // bootstrap w/ 2 commits
+    bootstrapMetadata(testTable);
 
-      // Write 2 (inserts)
-      newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      validateMetadata(client);
-
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 3 (updates)
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 4 (updates and inserts)
-      newCommitTime = "004";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "005";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Write 5 (updates and inserts)
-      newCommitTime = "006";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "007";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Deletes
-      newCommitTime = "008";
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      validateMetadata(client);
-
-      // Clean
-      newCommitTime = "009";
-      client.clean(newCommitTime);
-      validateMetadata(client);
-
-      // Restore
-      client.restoreToInstant("006");
-      validateMetadata(client);
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
-  }
-
-  /**
-   * Test rollback of various table operations sync to Metadata Table correctly.
-   */
-  @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testRollbackOperations(HoodieTableType tableType) throws Exception {
-    init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 2 (inserts) + Rollback of inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Write 3 (updates) + Rollback of updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
 
-      // Rollback of updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Rollback of Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, true);
     }
 
-    // Rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger clean
+    testTable.doClean("006", Collections.singletonList("001"));
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, false);
     }
 
-    // Marker based rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-    }
+    // trigger delete
+    testTable.doWriteOperation("007", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);

Review comment:
       in existing test, we also do restore. Have we covered restore after rewrite else where? 

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
##########
@@ -418,4 +445,166 @@ public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaCli
     }
     return Pair.of(partitionPathStatMap, globalStat);
   }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the file system.
+   */
+  public void validateMetadata(HoodieTestTable testTable, List<String> inflightCommits, HoodieWriteConfig writeConfig, String metadataTableBasePath) throws IOException {
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertNotNull(tableMetadata, "MetadataReader should have been initialized");
+    if (!writeConfig.isMetadataTableEnabled() || !writeConfig.getMetadataConfig().validateFileListingMetadata()) {
+      return;
+    }
+
+    assertEquals(inflightCommits, testTable.inflightCommits());
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Partitions should match
+    List<java.nio.file.Path> fsPartitionPaths = testTable.getAllPartitionPaths();
+    List<String> fsPartitions = new ArrayList<>();
+    fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString()));
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
+    assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
+    Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+    fsPartitions.forEach(partition -> {
+      try {
+        Path partitionPath;
+        if (partition.equals("")) {
+          // Should be the non-partitioned case
+          partitionPath = new Path(basePath);
+        } else {
+          partitionPath = new Path(basePath, partition);
+        }
+
+        FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition);

Review comment:
       you can probably move this to a private method. something like. validateFilesPerPartition(testTable, testTable, partitionPath). validateMetadata will be concise.  

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -421,16 +593,329 @@ public String getBaseFileNameById(String fileId) {
   }
 
   public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
+        .filter(entry -> {
+          boolean toReturn = true;
+          String fileName = entry.getPath().getName();
+          if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            toReturn = false;
+          } else {
+            for (String inflight : inflightCommits) {
+              if (fileName.contains(inflight)) {
+                toReturn = false;
+                break;
+              }
+            }
+          }
+          return toReturn;
+        }).toArray(FileStatus[]::new);
   }
 
   public FileStatus[] listAllFilesInTempFolder() throws IOException {
     return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
   }
 
+  public void deleteFilesInPartition(String partitionPath, List<String> filesToDelete) throws IOException {
+    FileStatus[] allFiles = listAllFilesInPartition(partitionPath);
+    Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> {
+      try {
+        Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName()));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+  }
+
+  public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime) throws Exception {
+    Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTimeToRollback);
+    if (!commitMetadata.isPresent()) {
+      throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback);
+    }
+    Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
+    HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles);
+    for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    return addRollback(commitTime, rollbackMetadata);
+  }
+
+  public HoodieTestTable doCluster(String commitTime, Map<String, List<String>> partitionToReplaceFileIds) throws Exception {
+    Map<String, List<Pair<String, Integer>>> partitionToReplaceFileIdsWithLength = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : partitionToReplaceFileIds.entrySet()) {
+      String partition = entry.getKey();
+      partitionToReplaceFileIdsWithLength.put(entry.getKey(), new ArrayList<>());
+      for (String fileId : entry.getValue()) {
+        int length = 100 + RANDOM.nextInt(500);
+        partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, length));
+      }
+    }
+    List<HoodieWriteStat> writeStats = generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, commitTime, false);
+    HoodieReplaceCommitMetadata replaceMetadata =
+        (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, REPLACE_COMMIT_ACTION);
+    return addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata);
+  }
+
+  public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> partitionFileCountsToDelete) throws IOException {
+    Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+    for (Map.Entry<String, Integer> entry : partitionFileCountsToDelete.entrySet()) {
+      partitionFilesToDelete.put(entry.getKey(), getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
+    }
+    HoodieTestTableState testTableState = new HoodieTestTableState();
+    for (Map.Entry<String, List<String>> entry : partitionFilesToDelete.entrySet()) {
+      testTableState = testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), entry.getValue());
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta = getHoodieCleanMetadata(commitTime, testTableState);
+    addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
+    return cleanerMeta.getValue();
+  }
+
+  public HoodieCleanMetadata doClean(String cleanCommitTime, List<String> commitsToClean) throws IOException {
+    Map<String, Integer> partitionFileCountsToDelete = new HashMap<>();
+    for (String commitTime : commitsToClean) {
+      Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTime);
+      if (commitMetadata.isPresent()) {
+        Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
+        for (String partition : partitionFiles.keySet()) {
+          partitionFileCountsToDelete.put(partition, partitionFiles.get(partition).size() + partitionFileCountsToDelete.getOrDefault(partition, 0));
+        }
+      }
+    }
+    return doClean(cleanCommitTime, partitionFileCountsToDelete);
+  }
+
+  public HoodieSavepointMetadata doSavepoint(String commitTime) throws IOException {
+    Option<HoodieCommitMetadata> commitMetadata = getMetadataForInstant(commitTime);
+    if (!commitMetadata.isPresent()) {
+      throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTime);
+    }
+    Map<String, List<String>> partitionFiles = getPartitionFiles(commitMetadata.get());
+    HoodieSavepointMetadata savepointMetadata = getSavepointMetadata(commitTime, partitionFiles);
+    for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    return savepointMetadata;
+  }
+
+  public HoodieTestTable doCompaction(String commitTime, List<String> partitions) throws Exception {
+    this.currentInstantTime = commitTime;
+    if (partitions.isEmpty()) {
+      partitions = Collections.singletonList(EMPTY_STRING);
+    }
+    HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(metaClient.getTableType(), commitTime, partitions, 1);
+    HoodieCommitMetadata commitMetadata = createCommitMetadata(COMPACT, commitTime, testTableState);
+    for (String partition : partitions) {
+      this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
+      if (MERGE_ON_READ.equals(metaClient.getTableType())) {
+        this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));

Review comment:
       ideally compaction will not produce any log files, just base files. fyi.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -421,16 +593,329 @@ public String getBaseFileNameById(String fileId) {
   }
 
   public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
+        .filter(entry -> {
+          boolean toReturn = true;
+          String fileName = entry.getPath().getName();
+          if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            toReturn = false;
+          } else {
+            for (String inflight : inflightCommits) {
+              if (fileName.contains(inflight)) {
+                toReturn = false;
+                break;
+              }
+            }
+          }
+          return toReturn;
+        }).toArray(FileStatus[]::new);
   }
 
   public FileStatus[] listAllFilesInTempFolder() throws IOException {
     return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
   }
 
+  public void deleteFilesInPartition(String partitionPath, List<String> filesToDelete) throws IOException {
+    FileStatus[] allFiles = listAllFilesInPartition(partitionPath);
+    Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> {
+      try {
+        Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName()));
+      } catch (IOException e) {
+        e.printStackTrace();

Review comment:
       Can we throw here. 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -214,6 +308,40 @@ public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata ro
     return this;
   }
 
+  public HoodieRollbackMetadata getRollbackMetadata(String instantTimeToDelete, Map<String, List<String>> partitionToFilesMeta) throws Exception {
+    HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata();
+    rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete));
+    rollbackMetadata.setStartRollbackTime(instantTimeToDelete);
+    Map<String, HoodieRollbackPartitionMetadata> partitionMetadataMap = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : partitionToFilesMeta.entrySet()) {
+      HoodieRollbackPartitionMetadata rollbackPartitionMetadata = new HoodieRollbackPartitionMetadata();
+      rollbackPartitionMetadata.setPartitionPath(entry.getKey());
+      rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue());
+      rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>());
+      rollbackPartitionMetadata.setWrittenLogFiles(new HashMap<>());

Review comment:
       we need to fix the written log files as well here based on info from commit metadata. and rollbacklogFiles has to be set by generating new log file info here on the fly.
   let's see if we can fix in this patch itself or may be create a follow up task. 

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -1088,196 +565,72 @@ public void testMetdataTableCommitFailure() throws Exception {
     assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
 
     // In this commit deltacommit "002" will be rolled back and attempted again.
-    String latestCommitTime = HoodieActiveTimeline.createNewInstantTime();
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      String newCommitTime = "003";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      records = dataGen.generateInserts(latestCommitTime, 20);
-      client.startCommitWithTime(latestCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
+    testTable.doWriteOperation("003", WriteOperationType.BULK_INSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 2);
+    syncTableMetadata(writeConfig);
 
     timeline = metadataMetaClient.reloadActiveTimeline();
     assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
     assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
-    assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime)));
-    assertTrue(timeline.getRollbackTimeline().countInstants() == 1);
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003")));
+    assertEquals(1, timeline.getRollbackTimeline().countInstants());
   }
 
   /**
-   * Validate the metadata tables contents to ensure it matches what is on the file system.
+   * Tests that if timeline has an inflight commit midway, metadata syncs only completed commits (including later to inflight commit).
    */
-  private void validateMetadata(SparkRDDWriteClient testClient) throws IOException {
-    HoodieWriteConfig config = testClient.getConfig();
-
-    SparkRDDWriteClient client;
-    if (config.isEmbeddedTimelineServerEnabled()) {
-      testClient.close();
-      client = new SparkRDDWriteClient(testClient.getEngineContext(), testClient.getConfig());
-    } else {
-      client = testClient;
-    }
-
-    HoodieTableMetadata tableMetadata = metadata(client);
-    assertNotNull(tableMetadata, "MetadataReader should have been initialized");
-    if (!config.isMetadataTableEnabled()) {
-      return;
-    }
-
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    // Partitions should match
-    FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext,
-        new SerializableConfiguration(hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning());
-    List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths();
-    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
-
-    Collections.sort(fsPartitions);
-    Collections.sort(metadataPartitions);
-
-    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
-    assertTrue(fsPartitions.equals(metadataPartitions), "Partitions should match");
-
-    // Files within each partition should match
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieSparkTable.create(config, engineContext);
-    TableFileSystemView tableView = table.getHoodieView();
-    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
-    Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
-    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
-
-    fsPartitions.forEach(partition -> {
-      try {
-        Path partitionPath;
-        if (partition.equals("")) {
-          // Should be the non-partitioned case
-          partitionPath = new Path(basePath);
-        } else {
-          partitionPath = new Path(basePath, partition);
-        }
-        FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
-        FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
-        List<String> fsFileNames = Arrays.stream(fsStatuses)
-            .map(s -> s.getPath().getName()).collect(Collectors.toList());
-        List<String> metadataFilenames = Arrays.stream(metaStatuses)
-            .map(s -> s.getPath().getName()).collect(Collectors.toList());
-        Collections.sort(fsFileNames);
-        Collections.sort(metadataFilenames);
-
-        assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
-
-        // File sizes should be valid
-        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
-
-        if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
-          LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
-          LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
-
-          for (String fileName : fsFileNames) {
-            if (!metadataFilenames.contains(fileName)) {
-              LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
-            }
-          }
-          for (String fileName : metadataFilenames) {
-            if (!fsFileNames.contains(fileName)) {
-              LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
-            }
-          }
-        }
-
-        // Block sizes should be valid
-        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
-        List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
-        Collections.sort(fsBlockSizes);
-        List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
-        Collections.sort(metadataBlockSizes);
-        assertEquals(fsBlockSizes, metadataBlockSizes);
-
-        assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
-        assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
-
-        // FileSystemView should expose the same data
-        List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
-        fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
-
-        fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
-        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
-        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));
-
-        long numFiles = fileGroups.stream()
-            .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
-            .sum();
-        assertEquals(metadataFilenames.size(), numFiles);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-        assertTrue(false, "Exception should not be raised: " + e);
-      }
-    });
-
-    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
-    assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
-
-    // Validate write config for metadata table
-    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
-    assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table");
-    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
-
-    // Metadata table should be in sync with the dataset
-    assertTrue(metadata(client).isInSync());
-    HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
-
-    // Metadata table is MOR
-    assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
-
-    // Metadata table is HFile format
-    assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE,
-        "Metadata Table base file format should be HFile");
-
-    // Metadata table has a fixed number of partitions
-    // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
-    // in the .hoodie folder.
-    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
-        false, false, false);
-    Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
-
-    // Metadata table should automatically compact and clean
-    // versions are +1 as autoclean / compaction happens end of commits
-    int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
-    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
-    metadataTablePartitions.forEach(partition -> {
-      List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
-      assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
-      assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
-      assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
-          + numFileVersions + " but was " + latestSlices.size());
-    });
-
-    LOG.info("Validation time=" + timer.endTimer());
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testInFlightCommit(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    // bootstrap w/ 2 commits
+    bootstrapMetadata(testTable);
+
+    // trigger an upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
+
+    // trigger an upsert
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable);
+
+    // create an inflight commit.
+    HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("006", WriteOperationType.UPSERT, Collections.emptyList(),
+        Arrays.asList("p1", "p2", "p3"), 2, false, true);
+
+    // trigger upsert
+    testTable.doWriteOperation("007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 006
+    // while validating against actual metadata table.
+    syncAndValidate(testTable, Collections.singletonList("006"), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), false);
+
+    // Remove the inflight instance holding back table sync
+    testTable.moveInflightCommitToComplete("006", inflightCommitMeta);
+    syncTableMetadata(writeConfig);
+
+    // A regular commit should get synced
+    testTable.doWriteOperation("008", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable);
   }
 
-  private HoodieBackedTableMetadataWriter metadataWriter(SparkRDDWriteClient client) {
-    return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter
-        .create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc));
+  private void bootstrapMetadata(HoodieTestTable testTable) throws Exception {

Review comment:
       lets call this doWriteOperationsAndBootstrapMetadata

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -886,194 +445,112 @@ public void testCleaningArchivingAndCompaction() throws Exception {
   /**
    * Test various error scenarios.
    */
-  @Test
-  public void testErrorCases() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testErrorCases(HoodieTableType tableType) throws Exception {
+    init(tableType);
     // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
     // should be rolled back to last valid commit.
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
-      // instant so that only the inflight is left over.
-      String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
-      assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
-          commitInstantFileName), false));
-    }
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
-      String newCommitTime = client.startCommit();
-      // Next insert
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      // Post rollback commit and metadata should be valid
-      validateMetadata(client);
-    }
+    testTable.doWriteOperation("001", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"),
+        Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable);
+    testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, Collections.emptyList(),
+        Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable);
+    // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
+    // instant so that only the inflight is left over.
+    String commitInstantFileName = HoodieTimeline.makeCommitFileName("002");
+    assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
+        commitInstantFileName), false));
+    // Next upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.emptyList(),
+        Arrays.asList("p1", "p2"), 1);
+    // Post rollback commit and metadata should be valid
+    syncTableMetadata(writeConfig);
+    HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline();
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003")));
   }
 
   /**
    * Test non-partitioned datasets.
    */
-  //@Test
-  public void testNonPartitioned() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""});
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = nonPartitionedGenerator.generateInserts(newCommitTime, 10);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      validateMetadata(client);
-
-      List<String> metadataPartitions = metadata(client).getAllPartitionPaths();
-      assertTrue(metadataPartitions.contains(""), "Must contain empty partition");
-    }
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testNonPartitioned(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    // Non-partitioned bulk insert
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, Collections.emptyList(), 1);
+    syncTableMetadata(writeConfig);
+    List<String> metadataPartitions = metadata(writeConfig, context).getAllPartitionPaths();
+    assertTrue(metadataPartitions.isEmpty(), "Must contain empty partition");
   }
 
   /**
    * Test various metrics published by metadata table.
    */
-  @Test
-  public void testMetadataMetrics() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) {
-      // Write
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
-      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
-      assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L);
-      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
-    }
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataMetrics(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    writeConfig = getWriteConfigBuilder(true, true, true).build();
+    testTable.doWriteOperation(HoodieActiveTimeline.createNewInstantTime(), WriteOperationType.INSERT, Arrays.asList("p1", "p2"),
+        Arrays.asList("p1", "p2"), 2, true);
+    syncTableMetadata(writeConfig);
+    Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
+    assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
+    assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L);
+    assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
   }
 
   /**
    * Test when reading from metadata table which is out of sync with dataset that results are still consistent.
    */
-  @Test
-  public void testMetadataOutOfSync() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true));
-
-    // Enable metadata so table is initialized
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Perform Bulk Insert
-      String newCommitTime = "001";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-    }
-
-    // Perform commit operations with metadata disabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      // Perform Insert
-      String newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      // Perform Upsert
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "004";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-      }
-    }
-
-    assertFalse(metadata(unsyncedClient).isInSync());
-    validateMetadata(unsyncedClient);
-
-    // Perform clean operation with metadata disabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      // One more commit needed to trigger clean so upsert and compact
-      String newCommitTime = "005";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
-      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "006";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-      }
-
-      // Clean
-      newCommitTime = "007";
-      client.clean(newCommitTime);
-    }
-
-    assertFalse(metadata(unsyncedClient).isInSync());
-    validateMetadata(unsyncedClient);
-
-    // Perform restore with metadata disabled
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataOutOfSync(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1);
+    // Enable metadata so table is initialized but do not sync
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
+    // Perform an insert and upsert
+    testTable.doWriteOperation("002", WriteOperationType.INSERT, Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 1);
+    // Run compaction for MOR table
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+    }
+    assertFalse(metadata(writeConfig, context).isInSync());
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 1);
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2"));
+    }
+    testTable.doClean("007", Collections.singletonList("001"));
+    /* TODO: Perform restore with metadata disabled

Review comment:
       restore is yet to be fixed is it? 

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -681,145 +323,68 @@ public void testManualRollbacks(HoodieTableType tableType) throws Exception {
   @Disabled
   public void testSync(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    String newCommitTime;
-    List<HoodieRecord> records;
-    List<WriteStatus> writeStatuses;
-
     // Initial commits without metadata table enabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
-
+    writeConfig = getWriteConfigBuilder(true, false, false).build();
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), 1);
     // Enable metadata table so it initialized by listing from file system
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      validateMetadata(client);
-      assertTrue(metadata(client).isInSync());
-    }
-
+    testTable.doWriteOperation("003", WriteOperationType.INSERT, Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, true);
     // Various table operations without metadata table enabled
-    String restoreToInstant;
-    String inflightActionTimestamp;
-    String beforeInflightActionTimestamp;
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Savepoint
-      restoreToInstant = newCommitTime;
-      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        client.savepoint("hoodie", "metadata test");
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Record a timestamp for creating an inflight instance for sync testing
-      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
-      beforeInflightActionTimestamp = newCommitTime;
-
-      // Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 5);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    testTable.doWriteOperation("004", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // insert overwrite to test replacecommit
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
-      writeStatuses = replaceResult.getWriteStatuses().collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
 
-    // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the
-    // in-memory merge should consider all the completed operations.
-    Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
-    fs.create(inflightCleanPath).close();
+    // trigger an upsert
+    testTable.doWriteOperation("007", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, true);
 
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
-      client.syncTableMetadata();
-
-      // Table should sync only before the inflightActionTimestamp
-      HoodieBackedTableMetadataWriter writer =
-          (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp);
+    // savepoint
+    if (COPY_ON_WRITE.equals(tableType)) {
+      testTable.doSavepoint("007");
+      syncTableMetadata(writeConfig);
+      assertTrue(metadata(writeConfig, context).isInSync());
+    }
 
-      // Reader should sync to all the completed instants
-      HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime);
+    // trigger delete
+    testTable.doWriteOperation("008", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-      // Remove the inflight instance holding back table sync
-      fs.delete(inflightCleanPath, false);
-      client.syncTableMetadata();
+    // trigger clean
+    testTable.doClean("009", Arrays.asList("001", "002"));
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      writer =
-          (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
+    // trigger another upsert
+    testTable.doWriteOperation("010", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      // Reader should sync to all the completed instants
-      metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
-    }
+    // trigger clustering
+    testTable.doCluster("011", new HashMap<>());
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-    // Enable metadata table and ensure it is synced
+    // If there is an inflight operation, the Metadata Table is not updated beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("012", WriteOperationType.UPSERT, Collections.emptyList(),

Review comment:
       IIUC, we should choose the inflight somewhere in between other commits. something like "009" or something. 

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -681,145 +323,68 @@ public void testManualRollbacks(HoodieTableType tableType) throws Exception {
   @Disabled
   public void testSync(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    String newCommitTime;
-    List<HoodieRecord> records;
-    List<WriteStatus> writeStatuses;
-
     // Initial commits without metadata table enabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
-
+    writeConfig = getWriteConfigBuilder(true, false, false).build();
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, Arrays.asList("p1", "p2"), 1);
     // Enable metadata table so it initialized by listing from file system
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      validateMetadata(client);
-      assertTrue(metadata(client).isInSync());
-    }
-
+    testTable.doWriteOperation("003", WriteOperationType.INSERT, Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, true);
     // Various table operations without metadata table enabled
-    String restoreToInstant;
-    String inflightActionTimestamp;
-    String beforeInflightActionTimestamp;
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Savepoint
-      restoreToInstant = newCommitTime;
-      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        client.savepoint("hoodie", "metadata test");
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Record a timestamp for creating an inflight instance for sync testing
-      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
-      beforeInflightActionTimestamp = newCommitTime;
-
-      // Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 5);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    testTable.doWriteOperation("004", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // insert overwrite to test replacecommit
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
-      writeStatuses = replaceResult.getWriteStatuses().collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
 
-    // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the
-    // in-memory merge should consider all the completed operations.
-    Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
-    fs.create(inflightCleanPath).close();
+    // trigger an upsert
+    testTable.doWriteOperation("007", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, true);
 
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
-      client.syncTableMetadata();
-
-      // Table should sync only before the inflightActionTimestamp
-      HoodieBackedTableMetadataWriter writer =
-          (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp);
+    // savepoint
+    if (COPY_ON_WRITE.equals(tableType)) {
+      testTable.doSavepoint("007");
+      syncTableMetadata(writeConfig);
+      assertTrue(metadata(writeConfig, context).isInSync());
+    }
 
-      // Reader should sync to all the completed instants
-      HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime);
+    // trigger delete
+    testTable.doWriteOperation("008", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-      // Remove the inflight instance holding back table sync
-      fs.delete(inflightCleanPath, false);
-      client.syncTableMetadata();
+    // trigger clean
+    testTable.doClean("009", Arrays.asList("001", "002"));
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      writer =
-          (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
+    // trigger another upsert
+    testTable.doWriteOperation("010", WriteOperationType.UPSERT, Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
 
-      // Reader should sync to all the completed instants
-      metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
-    }
+    // trigger clustering
+    testTable.doCluster("011", new HashMap<>());
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
 
-    // Enable metadata table and ensure it is synced
+    // If there is an inflight operation, the Metadata Table is not updated beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("012", WriteOperationType.UPSERT, Collections.emptyList(),
+        Arrays.asList("p1", "p2", "p3"), 2, false, true);
+    // trigger upsert
+    testTable.doWriteOperation("013", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 006
+    // while validating against actual metadata table.
+    syncAndValidate(testTable, Collections.singletonList("012"), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), false);
+    // Remove the inflight instance holding back table sync
+    testTable.moveInflightCommitToComplete("012", inflightCommitMeta);
+    syncTableMetadata(writeConfig);
+    // A regular commit should get synced
+    testTable.doWriteOperation("014", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
+

Review comment:
       why these are commented out?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -264,344 +156,109 @@ public void testOnlyValidPartitionsAdded() throws Exception {
     final String filteredDirectoryThree = ".backups";
 
     // Create some commits
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree)
         .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
         .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10);
 
-    final HoodieWriteConfig writeConfig =
-            getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
+    writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
-      client.startCommitWithTime("005");
-      client.insert(jsc.emptyRDD(), "005");
-
-      List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
-      assertFalse(partitions.contains(nonPartitionDirectory),
-          "Must not contain the non-partition " + nonPartitionDirectory);
-      assertTrue(partitions.contains("p1"), "Must contain partition p1");
-      assertTrue(partitions.contains("p2"), "Must contain partition p2");
-
-      assertFalse(partitions.contains(filteredDirectoryOne),
-          "Must not contain the filtered directory " + filteredDirectoryOne);
-      assertFalse(partitions.contains(filteredDirectoryTwo),
-          "Must not contain the filtered directory " + filteredDirectoryTwo);
-      assertFalse(partitions.contains(filteredDirectoryThree),
-          "Must not contain the filtered directory " + filteredDirectoryThree);
-
-      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
-      assertEquals(2, statuses.length);
-      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
-      assertEquals(5, statuses.length);
-      Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
-          Arrays.asList(basePath + "/p1", basePath + "/p2"));
-      assertEquals(2, partitionsToFilesMap.size());
-      assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
-      assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
-    }
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
+    syncTableMetadata(writeConfig);
+
+    List<String> partitions = metadataWriter(writeConfig).metadata().getAllPartitionPaths();
+    assertFalse(partitions.contains(nonPartitionDirectory),
+        "Must not contain the non-partition " + nonPartitionDirectory);
+    assertTrue(partitions.contains("p1"), "Must contain partition p1");
+    assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+    assertFalse(partitions.contains(filteredDirectoryOne),
+        "Must not contain the filtered directory " + filteredDirectoryOne);
+    assertFalse(partitions.contains(filteredDirectoryTwo),
+        "Must not contain the filtered directory " + filteredDirectoryTwo);
+    assertFalse(partitions.contains(filteredDirectoryThree),
+        "Must not contain the filtered directory " + filteredDirectoryThree);
+
+    FileStatus[] statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p1"));
+    assertEquals(3, statuses.length);
+    statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p2"));
+    assertEquals(6, statuses.length);
+    Map<String, FileStatus[]> partitionsToFilesMap = metadata(writeConfig, context).getAllFilesInPartitions(
+        Arrays.asList(basePath + "/p1", basePath + "/p2"));
+    assertEquals(2, partitionsToFilesMap.size());
+    assertEquals(3, partitionsToFilesMap.get(basePath + "/p1").length);
+    assertEquals(6, partitionsToFilesMap.get(basePath + "/p2").length);
   }
 
   /**
    * Test various table operations sync to Metadata Table correctly.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperations(HoodieTableType tableType) throws Exception {
+  @MethodSource("bootstrapAndTableOperationTestArgs")
+  public void testTableOperations(HoodieTableType tableType, boolean doNotSyncFewCommits) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // bootstrap w/ 2 commits
+    bootstrapMetadata(testTable);
 
-      // Write 2 (inserts)
-      newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      validateMetadata(client);
-
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 3 (updates)
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 4 (updates and inserts)
-      newCommitTime = "004";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "005";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Write 5 (updates and inserts)
-      newCommitTime = "006";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "007";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Deletes
-      newCommitTime = "008";
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      validateMetadata(client);
-
-      // Clean
-      newCommitTime = "009";
-      client.clean(newCommitTime);
-      validateMetadata(client);
-
-      // Restore
-      client.restoreToInstant("006");
-      validateMetadata(client);
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
-  }
-
-  /**
-   * Test rollback of various table operations sync to Metadata Table correctly.
-   */
-  @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testRollbackOperations(HoodieTableType tableType) throws Exception {
-    init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 2 (inserts) + Rollback of inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Write 3 (updates) + Rollback of updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
 
-      // Rollback of updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Rollback of Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, true);
     }
 
-    // Rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger clean
+    testTable.doClean("006", Collections.singletonList("001"));
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, false);
     }
 
-    // Marker based rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-    }
+    // trigger delete
+    testTable.doWriteOperation("007", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
   }
 
   /**
-   * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
-   * Once explicit sync is called, metadata should match.
+   * Tests rollback of a commit with metadata enabled.
    */
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
-  public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
+  public void testRollbackOperations(HoodieTableType tableType) throws Exception {

Review comment:
       there are some gaps here when compared to original test that is being replaced. 
   rollback of compaction,
   rollback of partial writes (listing based and marker based) 
   

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -264,344 +156,109 @@ public void testOnlyValidPartitionsAdded() throws Exception {
     final String filteredDirectoryThree = ".backups";
 
     // Create some commits
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree)
         .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
         .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10);
 
-    final HoodieWriteConfig writeConfig =
-            getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
+    writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
-      client.startCommitWithTime("005");
-      client.insert(jsc.emptyRDD(), "005");
-
-      List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
-      assertFalse(partitions.contains(nonPartitionDirectory),
-          "Must not contain the non-partition " + nonPartitionDirectory);
-      assertTrue(partitions.contains("p1"), "Must contain partition p1");
-      assertTrue(partitions.contains("p2"), "Must contain partition p2");
-
-      assertFalse(partitions.contains(filteredDirectoryOne),
-          "Must not contain the filtered directory " + filteredDirectoryOne);
-      assertFalse(partitions.contains(filteredDirectoryTwo),
-          "Must not contain the filtered directory " + filteredDirectoryTwo);
-      assertFalse(partitions.contains(filteredDirectoryThree),
-          "Must not contain the filtered directory " + filteredDirectoryThree);
-
-      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
-      assertEquals(2, statuses.length);
-      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
-      assertEquals(5, statuses.length);
-      Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
-          Arrays.asList(basePath + "/p1", basePath + "/p2"));
-      assertEquals(2, partitionsToFilesMap.size());
-      assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
-      assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
-    }
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
+    syncTableMetadata(writeConfig);
+
+    List<String> partitions = metadataWriter(writeConfig).metadata().getAllPartitionPaths();
+    assertFalse(partitions.contains(nonPartitionDirectory),
+        "Must not contain the non-partition " + nonPartitionDirectory);
+    assertTrue(partitions.contains("p1"), "Must contain partition p1");
+    assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+    assertFalse(partitions.contains(filteredDirectoryOne),
+        "Must not contain the filtered directory " + filteredDirectoryOne);
+    assertFalse(partitions.contains(filteredDirectoryTwo),
+        "Must not contain the filtered directory " + filteredDirectoryTwo);
+    assertFalse(partitions.contains(filteredDirectoryThree),
+        "Must not contain the filtered directory " + filteredDirectoryThree);
+
+    FileStatus[] statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p1"));
+    assertEquals(3, statuses.length);
+    statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p2"));
+    assertEquals(6, statuses.length);
+    Map<String, FileStatus[]> partitionsToFilesMap = metadata(writeConfig, context).getAllFilesInPartitions(
+        Arrays.asList(basePath + "/p1", basePath + "/p2"));
+    assertEquals(2, partitionsToFilesMap.size());
+    assertEquals(3, partitionsToFilesMap.get(basePath + "/p1").length);
+    assertEquals(6, partitionsToFilesMap.get(basePath + "/p2").length);
   }
 
   /**
    * Test various table operations sync to Metadata Table correctly.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperations(HoodieTableType tableType) throws Exception {
+  @MethodSource("bootstrapAndTableOperationTestArgs")
+  public void testTableOperations(HoodieTableType tableType, boolean doNotSyncFewCommits) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // bootstrap w/ 2 commits
+    bootstrapMetadata(testTable);
 
-      // Write 2 (inserts)
-      newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      validateMetadata(client);
-
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 3 (updates)
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 4 (updates and inserts)
-      newCommitTime = "004";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "005";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Write 5 (updates and inserts)
-      newCommitTime = "006";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "007";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Deletes
-      newCommitTime = "008";
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      validateMetadata(client);
-
-      // Clean
-      newCommitTime = "009";
-      client.clean(newCommitTime);
-      validateMetadata(client);
-
-      // Restore
-      client.restoreToInstant("006");
-      validateMetadata(client);
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
-  }
-
-  /**
-   * Test rollback of various table operations sync to Metadata Table correctly.
-   */
-  @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testRollbackOperations(HoodieTableType tableType) throws Exception {
-    init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 2 (inserts) + Rollback of inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Write 3 (updates) + Rollback of updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
 
-      // Rollback of updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Rollback of Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, true);
     }
 
-    // Rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger clean
+    testTable.doClean("006", Collections.singletonList("001"));
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, false);
     }
 
-    // Marker based rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-    }
+    // trigger delete
+    testTable.doWriteOperation("007", WriteOperationType.DELETE, Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
   }
 
   /**
-   * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
-   * Once explicit sync is called, metadata should match.
+   * Tests rollback of a commit with metadata enabled.
    */
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
-  public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
+  public void testRollbackOperations(HoodieTableType tableType) throws Exception {

Review comment:
       Also, may I know where have we covered testRollbackUnsyncedCommit ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org