You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:50 UTC
[hudi] 08/08: [HUDI-5647] Automate savepoint and restore tests (#7796)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 40d534e878bb8ceb4bc7fa20539cca3debd5727e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 2 14:49:53 2023 +0800
[HUDI-5647] Automate savepoint and restore tests (#7796)
---
.../TestSavepointRestoreCopyOnWrite.java | 173 ++++++++++++++
.../TestSavepointRestoreMergeOnRead.java | 248 +++++++++++++++++++++
.../hudi/testutils/HoodieClientTestBase.java | 62 +++++-
3 files changed, 482 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
new file mode 100644
index 00000000000..8a71a01fda9
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for COPY_ON_WRITE table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreCopyOnWrite extends HoodieClientTestBase {
+
+ /**
+ * Actions: C1, C2, savepoint C2, C3, C4, restore.
+ * Should go back to C2,
+ * C3 and C4 should be cleaned up.
+ */
+ @Test
+ void testBasicRollback() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ for (int i = 1; i <= 4; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2nd commit been rolled back
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty());
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(40);
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(20);
+ }
+ }
+
+ /**
+ * The restore should roll back all the pending instants that are beyond the savepoint.
+ *
+ * <p>Actions: C1, C2, savepoint C2, C3, C4 inflight, restore.
+ * Should go back to C2,
+ * C3, C4 should be cleaned up.
+ */
+ @Test
+ void testCleaningPendingInstants() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ for (int i = 1; i <= 3; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2nd commit been rolled back
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty());
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(30);
+ // write another pending instant
+ insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(20);
+ }
+ }
+
+ /**
+ * The rollbacks(either inflight or complete) beyond the savepoint should be cleaned.
+ *
+ * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore.
+ * Should go back to C2.
+ * C3, C4(RB_C3), C5 should be cleaned up.
+ *
+ * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore.
+ * Should go back to C2.
+ * C3, C4 (RB_C3) should be cleaned up.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCleaningRollbackInstants(boolean commitRollback) throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+ // eager cleaning
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ for (int i = 1; i <= 2; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2nd commit been rolled back
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty());
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(20);
+ // write another pending instant
+ insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords);
+ // rollback the pending instant
+ if (commitRollback) {
+ client.rollbackFailedWrites();
+ } else {
+ HoodieInstant pendingInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction()
+ .lastInstant().orElseThrow(() -> new HoodieException("Pending instant does not exist"));
+ HoodieSparkTable.create(client.getConfig(), context)
+ .scheduleRollback(context, HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true);
+ }
+ Option<String> rollbackInstant = metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp);
+ assertTrue(rollbackInstant.isPresent(), "The latest instant should be a rollback");
+ // write another batch
+ insertBatch(hoodieWriteConfig, client, HoodieActiveTimeline.createNewInstantTime(), rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * 3, 1, Option.empty());
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(20);
+ }
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
new file mode 100644
index 00000000000..6c1dfe5d734
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for MERGE_ON_READ table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase {
+
+ /**
+ * Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7. restore to DC3.
+ * Should roll back DC5 and DC6.
+ * The latest file slice should be fully cleaned up, and rollback log appends for DC4 in first file slice.
+ *
+ * <p>For example file layout,
+ * FG1:
+ * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
+ * BF5(C5), LF1(DC6), LF2(DC7)
+ * After restore, it becomes
+ * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
+ *
+ * <p>Expected behaviors:
+ * snapshot query: total rec matches.
+ * checking the row count by updating columns in (val4,val5,val6, val7).
+ */
+ @Test
+ void testCleaningDeltaCommits() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+ .withInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ final int numRecords = 10;
+ List<HoodieRecord> baseRecordsToUpdate = null;
+ for (int i = 1; i <= 3; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2td commit been rolled back
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.insert(writeRecords, newCommitTime);
+ if (i == 3) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ baseRecordsToUpdate = records;
+ client.savepoint("user1", "Savepoint for 3rd commit");
+ }
+ }
+
+ assertRowNumberEqualsTo(30);
+
+ // write another 3 delta commits
+ for (int i = 1; i <= 3; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.upsert(writeRecords, newCommitTime);
+ if (i == 1) {
+ Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+ client.compact(compactionInstant.get());
+ }
+ }
+
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(30);
+ }
+ }
+
+ /**
+ * <p>Actions: DC1, DC2, DC3, savepoint DC3, DC4, C5.pending, DC6, DC7, restore
+ * should roll back until DC3.
+ *
+ * <P>Expected behaviors: pending compaction after savepoint should also be cleaned,
+ * the latest file slice should be fully delete, for DC4 a rollback log append should be made.
+ */
+ @Test
+ void testCleaningPendingCompaction() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ final int numRecords = 10;
+ List<HoodieRecord> baseRecordsToUpdate = null;
+ for (int i = 1; i <= 3; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2td commit been rolled back
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.insert(writeRecords, newCommitTime);
+ if (i == 3) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ baseRecordsToUpdate = records;
+ client.savepoint("user1", "Savepoint for 3rd commit");
+ }
+ }
+
+ assertRowNumberEqualsTo(30);
+
+ // write another 3 delta commits
+ for (int i = 1; i <= 3; i++) {
+ upsertBatch(writeClient, baseRecordsToUpdate);
+ if (i == 1) {
+ Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+ compactWithoutCommit(compactionInstant.get());
+ }
+ }
+
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(30);
+ }
+ }
+
+ /**
+ * Actions: DC1, DC2, DC3, C4, savepoint C4, DC5, C6(RB_DC5), DC7, restore
+ *
+ * <P>Expected behaviors: should roll back DC5, C6 and DC6.
+ * No files will be cleaned up. Only rollback log appends.
+ */
+ @Test
+ void testCleaningCompletedRollback() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit triggers compaction
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ String savepointCommit = null;
+ final int numRecords = 10;
+ List<HoodieRecord> baseRecordsToUpdate = null;
+ for (int i = 1; i <= 2; i++) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ // Write 4 inserts with the 2td commit been rolled back
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.insert(writeRecords, newCommitTime);
+ if (i == 2) {
+ baseRecordsToUpdate = records;
+ }
+ }
+
+ // update to generate log files, then a valid compaction plan can be scheduled
+ upsertBatch(client, baseRecordsToUpdate);
+ Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled");
+ client.compact(compactionInstant.get());
+ savepointCommit = compactionInstant.get();
+ client.savepoint("user1", "Savepoint for 3td commit");
+
+ assertRowNumberEqualsTo(20);
+ // write a delta_commit but does not commit
+ updateBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+ // rollback the delta_commit
+ assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit should be rolled back");
+
+ // another update
+ upsertBatch(writeClient, baseRecordsToUpdate);
+
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null"));
+ assertRowNumberEqualsTo(20);
+ }
+ }
+
+ private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null"));
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.upsert(writeRecords, newCommitTime);
+ }
+
+ private void compactWithoutCommit(String compactionInstantTime) {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoCommit(false) // disable auto commit
+ .withRollbackUsingMarkers(true)
+ .build();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ JavaRDD<WriteStatus> statuses = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime).getWriteStatuses();
+ assertNoWriteErrors(statuses.collect());
+ }
+ }
+
+ @Override
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 2b4172781c6..43f843a4f33 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -597,7 +597,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
// verify that there is a commit
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
- HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+ HoodieTimeline timeline = metaClient.getCommitsTimeline();
if (assertForCommit) {
assertEquals(expTotalCommits, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(),
@@ -700,6 +700,66 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
return result;
}
+ /**
+ * Insert a batch of records without commit(so that the instant is in-flight).
+ *
+ * @param newCommitTime The commit time
+ * @param numRecords The number of records to insert
+ */
+ @SuppressWarnings("rawtypes, unchecked")
+ protected void insertBatchWithoutCommit(String newCommitTime, int numRecords) {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoCommit(false) // disable auto commit
+ .withRollbackUsingMarkers(true)
+ .build();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+ }
+ }
+
+ /**
+ * Update a batch of records without commit(so that the instant is in-flight).
+ *
+ * @param newCommitTime The commit time
+ * @param baseRecordsToUpdate The base records to update
+ */
+ @SuppressWarnings("rawtypes, unchecked")
+ protected void updateBatchWithoutCommit(String newCommitTime, List<HoodieRecord> baseRecordsToUpdate) throws IOException {
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoCommit(false) // disable auto commit
+ .withRollbackUsingMarkers(true)
+ .build();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, baseRecordsToUpdate);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.upsert(writeRecords, newCommitTime);
+ }
+ }
+
+ /**
+ * Asserts the row number in data generator equals to {@code numRows}.
+ *
+ * @param numRows The expected row number
+ */
+ protected void assertRowNumberEqualsTo(int numRows) {
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals(numRows, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
+ "Must contain " + numRows + " records");
+ }
+
/**
* Get Cleaner state corresponding to a partition path.
*