You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:50 UTC

[hudi] 08/08: [HUDI-5647] Automate savepoint and restore tests (#7796)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 40d534e878bb8ceb4bc7fa20539cca3debd5727e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 2 14:49:53 2023 +0800

    [HUDI-5647] Automate savepoint and restore tests (#7796)
---
 .../TestSavepointRestoreCopyOnWrite.java           | 173 ++++++++++++++
 .../TestSavepointRestoreMergeOnRead.java           | 248 +++++++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |  62 +++++-
 3 files changed, 482 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
new file mode 100644
index 00000000000..8a71a01fda9
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for COPY_ON_WRITE table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreCopyOnWrite extends HoodieClientTestBase {
+
+  /**
+   * Actions: C1, C2, savepoint C2, C3, C4, restore.
+   * Should go back to C2,
+   * C3 and C4 should be cleaned up.
+   */
+  @Test
+  void testBasicRollback() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 4; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(40);
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  /**
+   * The restore should roll back all the pending instants that are beyond the savepoint.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 inflight, restore.
+   * Should go back to C2,
+   * C3, C4 should be cleaned up.
+   */
+  @Test
+  void testCleaningPendingInstants() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(30);
+      // write another pending instant
+      insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  /**
+   * The rollbacks(either inflight or complete) beyond the savepoint should be cleaned.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore.
+   * Should go back to C2.
+   * C3, C4(RB_C3), C5 should be cleaned up.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore.
+   * Should go back to C2.
+   * C3, C4 (RB_C3) should be cleaned up.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCleaningRollbackInstants(boolean commitRollback) throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+        // eager cleaning
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 2; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2nd commit been rolled back
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty());
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(20);
+      // write another pending instant
+      insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+      // rollback the pending instant
+      if (commitRollback) {
+        client.rollbackFailedWrites();
+      } else {
+        HoodieInstant pendingInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction()
+            .lastInstant().orElseThrow(() -> new HoodieException("Pending instant does not exist"));
+        HoodieSparkTable.create(client.getConfig(), context)
+            .scheduleRollback(context, HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true);
+      }
+      Option<String> rollbackInstant = metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp);
+      assertTrue(rollbackInstant.isPresent(), "The latest instant should be a rollback");
+      // write another batch
+      insertBatch(hoodieWriteConfig, client, HoodieActiveTimeline.createNewInstantTime(), rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords * 3, 1, Option.empty());
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
new file mode 100644
index 00000000000..6c1dfe5d734
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for MERGE_ON_READ table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase {
+
+  /**
+   * Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7. restore to DC3.
+   * Should roll back DC5 and DC6.
+   * The latest file slice should be fully cleaned up, and rollback log appends for DC4 in first file slice.
+   *
+   * <p>For example file layout,
+   * FG1:
+   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
+   *   BF5(C5), LF1(DC6), LF2(DC7)
+   * After restore, it becomes
+   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
+   *
+   * <p>Expected behaviors:
+   *   snapshot query: total rec matches.
+   *   checking the row count by updating columns in (val4,val5,val6, val7).
+   */
+  @Test
+  void testCleaningDeltaCommits() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+            .withInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 3) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          baseRecordsToUpdate = records;
+          client.savepoint("user1", "Savepoint for 3rd commit");
+        }
+      }
+
+      assertRowNumberEqualsTo(30);
+
+      // write another 3 delta commits
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.upsert(writeRecords, newCommitTime);
+        if (i == 1) {
+          Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+          assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+          client.compact(compactionInstant.get());
+        }
+      }
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(30);
+    }
+  }
+
+  /**
+   * <p>Actions: DC1, DC2, DC3, savepoint DC3, DC4, C5.pending, DC6, DC7, restore
+   * should roll back until DC3.
+   *
+   * <P>Expected behaviors: pending compaction after savepoint should also be cleaned,
+   * the latest file slice should be fully delete, for DC4 a rollback log append should be made.
+   */
+  @Test
+  void testCleaningPendingCompaction() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 3) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          baseRecordsToUpdate = records;
+          client.savepoint("user1", "Savepoint for 3rd commit");
+        }
+      }
+
+      assertRowNumberEqualsTo(30);
+
+      // write another 3 delta commits
+      for (int i = 1; i <= 3; i++) {
+        upsertBatch(writeClient, baseRecordsToUpdate);
+        if (i == 1) {
+          Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+          assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+          compactWithoutCommit(compactionInstant.get());
+        }
+      }
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(30);
+    }
+  }
+
+  /**
+   * Actions: DC1, DC2, DC3, C4, savepoint C4, DC5, C6(RB_DC5), DC7, restore
+   *
+   * <P>Expected behaviors: should roll back DC5, C6 and DC6.
+   * No files will be cleaned up. Only rollback log appends.
+   */
+  @Test
+  void testCleaningCompletedRollback() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit triggers compaction
+            .withInlineCompaction(false)
+            .withScheduleInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      String savepointCommit = null;
+      final int numRecords = 10;
+      List<HoodieRecord> baseRecordsToUpdate = null;
+      for (int i = 1; i <= 2; i++) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        // Write 4 inserts with the 2td commit been rolled back
+        client.startCommitWithTime(newCommitTime);
+        List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+        JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+        if (i == 2) {
+          baseRecordsToUpdate = records;
+        }
+      }
+
+      // update to generate log files, then a valid compaction plan can be scheduled
+      upsertBatch(client, baseRecordsToUpdate);
+      Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+      client.compact(compactionInstant.get());
+      savepointCommit = compactionInstant.get();
+      client.savepoint("user1", "Savepoint for 3td commit");
+
+      assertRowNumberEqualsTo(20);
+      // write a delta_commit but does not commit
+      updateBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+      // rollback the delta_commit
+      assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit should be rolled back");
+
+      // another update
+      upsertBatch(writeClient, baseRecordsToUpdate);
+
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    client.upsert(writeRecords, newCommitTime);
+  }
+
+  private void compactWithoutCommit(String compactionInstantTime) {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      JavaRDD<WriteStatus> statuses = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime).getWriteStatuses();
+      assertNoWriteErrors(statuses.collect());
+    }
+  }
+
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 2b4172781c6..43f843a4f33 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -597,7 +597,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
 
     // verify that there is a commit
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
-    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+    HoodieTimeline timeline = metaClient.getCommitsTimeline();
 
     if (assertForCommit) {
       assertEquals(expTotalCommits, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(),
@@ -700,6 +700,66 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
     return result;
   }
 
+  /**
+   * Insert a batch of records without commit(so that the instant is in-flight).
+   *
+   * @param newCommitTime The commit time
+   * @param numRecords    The number of records to insert
+   */
+  @SuppressWarnings("rawtypes, unchecked")
+  protected void insertBatchWithoutCommit(String newCommitTime, int numRecords) {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+    }
+  }
+
+  /**
+   * Update a batch of records without commit(so that the instant is in-flight).
+   *
+   * @param newCommitTime The commit time
+   * @param baseRecordsToUpdate The base records to update
+   */
+  @SuppressWarnings("rawtypes, unchecked")
+  protected void updateBatchWithoutCommit(String newCommitTime, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withAutoCommit(false) // disable auto commit
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, baseRecordsToUpdate);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.upsert(writeRecords, newCommitTime);
+    }
+  }
+
+  /**
+   * Asserts the row number in data generator equals to {@code numRows}.
+   *
+   * @param numRows The expected row number
+   */
+  protected void assertRowNumberEqualsTo(int numRows) {
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals(numRows, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
+        "Must contain " + numRows + " records");
+  }
+
   /**
    * Get Cleaner state corresponding to a partition path.
    *