You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by sa...@apache.org on 2021/01/29 18:29:03 UTC
[hudi] branch master updated: [HUDI-1266] Add unit test for
validating replacecommit rollback (#2418)
This is an automated email from the ASF dual-hosted git repository.
satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9cb6cb8 [HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
9cb6cb8 is described below
commit 9cb6cb818968d3ef10b3264714e30b8df957ec9d
Author: satishkotha <sa...@uber.com>
AuthorDate: Fri Jan 29 10:28:08 2021 -0800
[HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
---
.../rollback/HoodieClientRollbackTestBase.java | 62 +++++++++++++++++++++-
.../TestCopyOnWriteRollbackActionExecutor.java | 22 +++++++-
2 files changed, 81 insertions(+), 3 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
index eb0e871..3b0829b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -18,23 +18,26 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
-
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
@@ -96,4 +99,61 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
}
+
+ protected void insertOverwriteCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
+ List<FileSlice> secondPartitionCommit2FileSlices,
+ HoodieWriteConfig cfg,
+ boolean commitSecondInsertOverwrite) throws IOException {
+ //just generate two partitions
+ dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+ HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ /**
+ * Write 1 (upsert)
+ */
+ String newCommitTime = "001";
+ List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+ Assertions.assertNoWriteErrors(statuses.collect());
+ client.commit(newCommitTime, statuses);
+
+ // get fileIds written
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+ List<HoodieFileGroup> firstPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, firstPartitionCommit1FileGroups.size());
+ Set<String> partition1Commit1FileIds = firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+ List<HoodieFileGroup> secondPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, secondPartitionCommit1FileGroups.size());
+ Set<String> partition2Commit1FileIds = secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+
+ /**
+ * Write 2 (one insert_overwrite)
+ */
+ String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION;
+ newCommitTime = "002";
+ records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+ writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime, commitActionType);
+ HoodieWriteResult result = client.insertOverwrite(writeRecords, newCommitTime);
+ statuses = result.getWriteStatuses();
+ Assertions.assertNoWriteErrors(statuses.collect());
+ if (commitSecondInsertOverwrite) {
+ client.commit(newCommitTime, statuses, Option.empty(), commitActionType, result.getPartitionToReplaceFileIds());
+ }
+ metaClient.reloadActiveTimeline();
+ // get new fileIds written as part of insert_overwrite
+ fsView = getFileSystemViewWithUnCommittedSlices(metaClient);
+ List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH)
+ .filter(fg -> !partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+ firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+ List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH)
+ .filter(fg -> !partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+ secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
+ assertEquals(1, firstPartitionCommit2FileSlices.size());
+ assertEquals(1, secondPartitionCommit2FileSlices.size());
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e14dbf9..030cc3e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -125,6 +124,19 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
}
+ // Verify that rollback works with replacecommit
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCopyOnWriteRollbackWithReplaceCommits(boolean isUsingMarkers) throws IOException {
+ //1. prepare data and assert data result
+ List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+ List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+ HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+ this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
@@ -133,8 +145,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+ metaClient.reloadActiveTimeline();
HoodieTable table = this.getHoodieTable(metaClient, cfg);
-
+ performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+ }
+
+ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
+ List<FileSlice> firstPartitionCommit2FileSlices,
+ List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
//2. rollback
HoodieInstant commitInstant;
if (isUsingMarkers) {