You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/06/29 08:59:16 UTC

[hudi] branch master updated: [HUDI-6151] Rollback previously applied commits to MDT when operations are retried (#8604)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b95248e0119 [HUDI-6151] Rollback previously applied commits to MDT when operations are retried (#8604)
b95248e0119 is described below

commit b95248e011931f4748a7a9fbb8298cbbb71bda88
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Thu Jun 29 01:59:08 2023 -0700

    [HUDI-6151] Rollback previously applied commits to MDT when operations are retried (#8604)
    
    Operations like Clean, Compaction are retried after failures with the same instant time. If the previous run of the operation successfully committed to the MDT but failed to commit to the dataset, then the operation will be retried later with the same instantTime causing duplicate updates applied to MDT.
    
    Currently, we simply delete the completed deltacommit without rolling back the deltacommit.
    
    To handle this, we detect a replay of operation and rollback any changes from that operation in MDT.
    
    ---------
    
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 .../FlinkHoodieBackedTableMetadataWriter.java      | 50 ++++++++--------
 .../SparkHoodieBackedTableMetadataWriter.java      | 38 ++++++------
 .../functional/TestHoodieBackedMetadata.java       | 68 +++++++++++++++++++++-
 3 files changed, 113 insertions(+), 43 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 7dd32e2916e..6edeac05a74 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -32,9 +32,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -46,7 +50,7 @@ import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGE
  * Flink hoodie backed table metadata writer.
  */
 public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
-
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
   private transient BaseHoodieWriteClient writeClient;
 
   public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
@@ -118,33 +122,31 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
 
       if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
         // if this is a new commit being applied to metadata for the first time
-        writeClient.startCommitWithTime(instantTime);
-        metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
       } else {
-        Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
-        if (alreadyCompletedInstant.isPresent()) {
-          // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
-          // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
-          // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
-          // are upserts to metadata table and so only a new delta commit will be created.
-          // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
-          // already part of completed commit. So, we have to manually remove the completed instant and proceed.
-          // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
-          HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
-          metadataMetaClient.reloadActiveTimeline();
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will be created.
+        // once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
         }
-        // If the alreadyCompletedInstant is empty, that means there is a requested or inflight
-        // instant with the same instant time.  This happens for data table clean action which
-        // reuses the same instant time without rollback first.  It is a no-op here as the
-        // clean plan is the same, so we don't need to delete the requested and inflight instant
-        // files in the active timeline.
-
-        // The metadata writer uses LAZY cleaning strategy without auto commit,
-        // write client then checks the heartbeat expiration when committing the instant,
-        // sets up the heartbeat explicitly to make the check pass.
-        writeClient.getHeartbeatClient().start(instantTime);
+        metadataMetaClient.reloadActiveTimeline();
       }
 
+      writeClient.startCommitWithTime(instantTime);
+      metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+
       List<WriteStatus> statuses = isInitializing
           ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, Option.empty())
           : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index eab0c436248..9a254409b8d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -29,13 +29,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -144,27 +144,29 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
 
       if (!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
         // if this is a new commit being applied to metadata for the first time
-        writeClient.startCommitWithTime(instantTime);
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
       } else {
-        Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
-        if (alreadyCompletedInstant.isPresent()) {
-          // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
-          // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
-          // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
-          // are upserts to metadata table and so only a new delta commit will be created.
-          // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
-          // already part of completed commit. So, we have to manually remove the completed instant and proceed.
-          // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
-          HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
-          metadataMetaClient.reloadActiveTimeline();
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will be created.
+        // once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
         }
-        // If the alreadyCompletedInstant is empty, that means there is a requested or inflight
-        // instant with the same instant time.  This happens for data table clean action which
-        // reuses the same instant time without rollback first.  It is a no-op here as the
-        // clean plan is the same, so we don't need to delete the requested and inflight instant
-        // files in the active timeline.
+        metadataMetaClient.reloadActiveTimeline();
       }
 
+      writeClient.startCommitWithTime(instantTime);
       if (bulkInsertPartitioner.isPresent()) {
         writeClient.bulkInsertPreppedRecords(preppedRecordRDD, instantTime, bulkInsertPartitioner).collect();
       } else {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index b50c002468f..84417fce958 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -2029,7 +2030,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
     HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
         .withAutoCommit(false)
-        .withClusteringConfig(clusteringConfig).build();
+        .withClusteringConfig(clusteringConfig)
+        .withRollbackUsingMarkers(false)
+        .build();
 
     // trigger clustering
     SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig);
@@ -2821,6 +2824,69 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     }
   }
 
+  /**
+   * Test duplicate operation with same instant timestamp.
+   *
+   * This can happen if the commit on the MDT succeeds but fails on the dataset. For some table services like clean,
+   * compaction, replace commit, the operation will be retried with the same timestamp (taken from inflight). Hence,
+   * metadata table will see an additional commit with the same timestamp as a previously completed deltacommit.
+   */
+  @Test
+  public void testRepeatedActionWithSameInstantTime() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    Properties props = new Properties();
+    props.put(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS.key(), "false");
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false).withProps(props).build();
+
+    // Perform three writes so we can initiate a clean
+    int index = 0;
+    final String partition = "2015/03/16";
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
+      for (; index < 3; ++index) {
+        String newCommitTime = "00" + index;
+        List<HoodieRecord> records = index == 0 ? dataGen.generateInsertsForPartition(newCommitTime, 10, partition)
+            : dataGen.generateUniqueUpdates(newCommitTime, 5);
+        client.startCommitWithTime(newCommitTime);
+        client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      }
+    }
+    assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
+      // Perform a clean
+      String cleanInstantTime = "00" + index++;
+      HoodieCleanMetadata cleanMetadata = client.clean(cleanInstantTime);
+      // 1 partition should be cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
+      // 1 file cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);
+
+      // To simulate failed clean on the main dataset, we will delete the completed clean instant
+      String cleanInstantFileName = HoodieTimeline.makeCleanerFileName(cleanInstantTime);
+      assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
+          cleanInstantFileName), false));
+      assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflights().countInstants(), 1);
+      assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants(), 0);
+
+      // Initiate another clean. The previous leftover clean will be attempted and no other clean will be scheduled.
+      String newCleanInstantTime = "00" + index++;
+      cleanMetadata = client.clean(newCleanInstantTime);
+
+      // 1 partition should be cleaned
+      assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
+      // 1 file cleaned but was already deleted so will be a failed delete
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 0);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 1);
+      assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);
+
+      validateMetadata(client);
+    }
+  }
+
   private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception {
     doPreBootstrapOperations(testTable, "0000001", "0000002");
   }