You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by sa...@apache.org on 2021/01/29 18:29:03 UTC

[hudi] branch master updated: [HUDI-1266] Add unit test for validating replacecommit rollback (#2418)

This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cb6cb8  [HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
9cb6cb8 is described below

commit 9cb6cb818968d3ef10b3264714e30b8df957ec9d
Author: satishkotha <sa...@uber.com>
AuthorDate: Fri Jan 29 10:28:08 2021 -0800

    [HUDI-1266] Add unit test for validating replacecommit rollback (#2418)
---
 .../rollback/HoodieClientRollbackTestBase.java     | 62 +++++++++++++++++++++-
 .../TestCopyOnWriteRollbackActionExecutor.java     | 22 +++++++-
 2 files changed, 81 insertions(+), 3 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
index eb0e871..3b0829b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -18,23 +18,26 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.Assertions;
 import org.apache.hudi.testutils.HoodieClientTestBase;
-
 import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
@@ -96,4 +99,61 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
       assertEquals(1, secondPartitionCommit2FileSlices.size());
     }
   }
+
+  protected void insertOverwriteCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
+                                                            List<FileSlice> secondPartitionCommit2FileSlices,
+                                                            HoodieWriteConfig cfg,
+                                                            boolean commitSecondInsertOverwrite) throws IOException {
+    //just generate two partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    /**
+     * Write 1 (upsert)
+     */
+    String newCommitTime = "001";
+    List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    client.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+    client.commit(newCommitTime, statuses);
+
+    // get fileIds written
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+    List<HoodieFileGroup> firstPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionCommit1FileGroups.size());
+    Set<String> partition1Commit1FileIds = firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+    List<HoodieFileGroup> secondPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, secondPartitionCommit1FileGroups.size());
+    Set<String> partition2Commit1FileIds = secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
+
+    /**
+     * Write 2 (one insert_overwrite)
+     */
+    String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION;
+    newCommitTime = "002";
+    records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+    writeRecords = jsc.parallelize(records, 1);
+    client.startCommitWithTime(newCommitTime, commitActionType);
+    HoodieWriteResult result = client.insertOverwrite(writeRecords, newCommitTime);
+    statuses = result.getWriteStatuses();
+    Assertions.assertNoWriteErrors(statuses.collect());
+    if (commitSecondInsertOverwrite) {
+      client.commit(newCommitTime, statuses, Option.empty(), commitActionType, result.getPartitionToReplaceFileIds());
+    }
+    metaClient.reloadActiveTimeline();
+    // get new fileIds written as part of insert_overwrite
+    fsView = getFileSystemViewWithUnCommittedSlices(metaClient);
+    List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH)
+        .filter(fg -> !partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+    firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+    List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH)
+        .filter(fg -> !partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
+    secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
+    assertEquals(1, firstPartitionCommit2FileSlices.size());
+    assertEquals(1, secondPartitionCommit2FileSlices.size());
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e14dbf9..030cc3e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -125,6 +124,19 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     assertFalse(testTable.baseFileExists(p2, "002", "id22"));
   }
 
+  // Verify that rollback works with replacecommit
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCopyOnWriteRollbackWithReplaceCommits(boolean isUsingMarkers) throws IOException {
+    //1. prepare data and assert data result
+    List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+    List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+    this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
@@ -133,8 +145,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
     HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
     this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+    metaClient.reloadActiveTimeline();
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
-
+    performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
+  }
+  
+  private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
+                                          List<FileSlice> firstPartitionCommit2FileSlices,
+                                          List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
     //2. rollback
     HoodieInstant commitInstant;
     if (isUsingMarkers) {