You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/07 10:02:42 UTC

[hudi] branch master updated: [HUDI-3805] Delete existing corrupted requested rollback plan during rollback (#5245)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d744bb35c [HUDI-3805] Delete existing corrupted requested rollback plan during rollback (#5245)
9d744bb35c is described below

commit 9d744bb35ce54d347c5ef50adeba3f2a8840d043
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Thu Apr 7 03:02:34 2022 -0700

    [HUDI-3805] Delete existing corrupted requested rollback plan during rollback (#5245)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  41 +++++---
 .../org/apache/hudi/client/TestClientRollback.java | 106 +++++++++++++++++++++
 .../hudi/common/testutils/FileCreateUtils.java     |   4 +
 3 files changed, 140 insertions(+), 11 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 70e3cebce4..32a8dee517 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.async.AsyncArchiveService;
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -72,7 +71,6 @@ import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Type;
 import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
@@ -82,6 +80,7 @@ import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metrics.HoodieMetrics;
@@ -95,8 +94,9 @@ import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 
 import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -105,11 +105,11 @@ import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Set;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -1113,9 +1113,28 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
   protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
     List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
     Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
-    for (HoodieInstant instant : instants) {
+    for (HoodieInstant rollbackInstant : instants) {
+      HoodieRollbackPlan rollbackPlan;
+      try {
+        rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
+      } catch (IOException e) {
+        if (rollbackInstant.isRequested()) {
+          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
+          try {
+            metaClient.getActiveTimeline().deletePending(rollbackInstant);
+          } catch (HoodieIOException he) {
+            LOG.warn("Cannot delete " + rollbackInstant, he);
+            continue;
+          }
+        } else {
+          // Here we assume that if the rollback is inflight, the rollback plan is intact
+          // in instant.rollback.requested.  The exception here can be due to other reasons.
+          LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
+        }
+        continue;
+      }
+
       try {
-        HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
         String action = rollbackPlan.getInstantToRollback().getAction();
         if (ignoreCompactionAndClusteringInstants) {
           if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
@@ -1124,14 +1143,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
                 rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
             if (!isClustering) {
               String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
-              infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
+              infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
             }
           }
         } else {
-          infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
+          infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
         }
-      } catch (IOException e) {
-        LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
+      } catch (Exception e) {
+        LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
       }
     }
     return infoMap;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index d2dabc0792..f6315eec7d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -61,9 +62,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -481,4 +484,107 @@ public class TestClientRollback extends HoodieClientTestBase {
       assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
     }
   }
+
+  private static Stream<Arguments> testRollbackWithRequestedRollbackPlanParams() {
+    return Arrays.stream(new Boolean[][] {
+        {true, true}, {true, false}, {false, true}, {false, false},
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testRollbackWithRequestedRollbackPlanParams")
+  public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception {
+    // Let's create some commit files and base files
+    final String p1 = "2022/04/05";
+    final String p2 = "2022/04/06";
+    final String commitTime1 = "20220406010101002";
+    final String commitTime2 = "20220406020601002";
+    final String commitTime3 = "20220406030611002";
+    final String rollbackInstantTime = "20220406040611002";
+    Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
+      {
+        put(p1, "id11");
+        put(p2, "id12");
+      }
+    };
+    Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
+      {
+        put(p1, "id21");
+        put(p2, "id22");
+      }
+    };
+    Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
+      {
+        put(p1, "id31");
+        put(p2, "id32");
+      }
+    };
+
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withRollbackUsingMarkers(false)
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder()
+                // Column Stats Index is disabled, since these tests construct tables which are
+                // not valid (empty commit metadata, invalid parquet files)
+                .withMetadataIndexColumnStats(false)
+                .enable(enableMetadataTable)
+                .build()
+        )
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
+
+    HoodieTestTable testTable = enableMetadataTable
+        ? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
+        metaClient.getHadoopConf(), config, context))
+        : HoodieTestTable.of(metaClient);
+
+    testTable.withPartitionMetaFiles(p1, p2)
+        .addCommit(commitTime1)
+        .withBaseFilesInPartitions(partitionAndFileId1)
+        .addCommit(commitTime2)
+        .withBaseFilesInPartitions(partitionAndFileId2)
+        .addInflightCommit(commitTime3)
+        .withBaseFilesInPartitions(partitionAndFileId3);
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+      if (isRollbackPlanCorrupted) {
+        // Add a corrupted requested rollback plan
+        FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2});
+      } else {
+        // Add a valid requested rollback plan to roll back commitTime3
+        HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+        List<HoodieRollbackRequest> rollbackRequestList = partitionAndFileId3.keySet().stream()
+            .map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, EMPTY_STRING,
+                Collections.singletonList(metaClient.getBasePath() + "/" + partition + "/"
+                    + FileCreateUtils.baseFileName(commitTime3, partitionAndFileId3.get(p1))),
+                Collections.emptyMap()))
+            .collect(Collectors.toList());
+        rollbackPlan.setRollbackRequests(rollbackRequestList);
+        rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION));
+        FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, rollbackPlan);
+      }
+
+      // Rollback commit3
+      client.rollback(commitTime3);
+      assertFalse(testTable.inflightCommitExists(commitTime3));
+      assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
+      assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
+
+      metaClient.reloadActiveTimeline();
+      List<HoodieInstant> rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
+      // Corrupted requested rollback plan should be deleted before scheduling a new one
+      assertEquals(rollbackInstants.size(), 1);
+      HoodieInstant rollbackInstant = rollbackInstants.get(0);
+      assertTrue(rollbackInstant.isCompleted());
+
+      if (isRollbackPlanCorrupted) {
+        // Should create a new rollback instant
+        assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
+      } else {
+        // Should reuse the rollback instant
+        assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
+      }
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 06f0ac49b6..27dd9df5ed 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -244,6 +244,10 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
   }
 
+  public static void createRequestedRollbackFile(String basePath, String instantTime, byte[] content) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
+  }
+
   public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException {
     createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
   }