You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/30 00:22:28 UTC

[hudi] branch master updated: [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e222693d87 [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)
e222693d87 is described below

commit e222693d87d48416670ca14c6f7fd69307432786
Author: 冯健 <fe...@gmail.com>
AuthorDate: Sun Oct 30 08:22:18 2022 +0800

    [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)
    
    Co-authored-by: jian.feng <ji...@shopee.com>
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 36 ++++++++++++++
 .../org/apache/hudi/sink/meta/CkpMetadata.java     | 57 ++++++++++++++++------
 .../sink/TestStreamWriteOperatorCoordinator.java   |  9 ++++
 3 files changed, 88 insertions(+), 14 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index c87d5b2443..a7b3994357 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -63,6 +63,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator
    */
   private CkpMetadata ckpMetadata;
 
+  /**
+   * Counter for the failed tasks, a number within the range (0, task_num) means
+   * a partial failover.
+   */
+  private transient AtomicInteger failedCnt;
+
   /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
@@ -294,6 +301,17 @@ public class StreamWriteOperatorCoordinator
     // reset the event
     this.eventBuffer[i] = null;
     LOG.warn("Reset the event for task [" + i + "]", throwable);
+
+    // based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
+    // when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
+    // then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).
+
+    // reset the ckp metadata for either partial or complete failover
+    if (this.failedCnt.get() == 0) {
+      this.ckpMetadata.reset();
+    }
+    // inc the failed tasks counter
+    this.failedCnt.incrementAndGet();
   }
 
   @Override
@@ -347,6 +365,14 @@ public class StreamWriteOperatorCoordinator
 
   private void reset() {
     this.eventBuffer = new WriteMetadataEvent[this.parallelism];
+    this.failedCnt = new AtomicInteger(0);
+  }
+
+  /**
+   * Checks whether it is a PARTIAL failover.
+   */
+  private boolean isPartialFailover() {
+    return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
   }
 
   /**
@@ -410,6 +436,16 @@ public class StreamWriteOperatorCoordinator
     if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
       // start to initialize the instant.
       initInstant(event.getInstantTime());
+    } else if (isPartialFailover()) {
+      // if the bootstrap event comes from a partial failover,
+      // decrement the failed tasks by one.
+
+      // if all the failed task bootstrap events are received, send a start instant
+      // to the ckp metadata and unblock the data flushing.
+      if (this.failedCnt.decrementAndGet() <= 0) {
+        this.ckpMetadata.startInstant(this.instant);
+        this.failedCnt.set(0);
+      }
     }
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 6895b2a0c6..d0f26740d6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
 
-  protected static final int MAX_RETAIN_CKP_NUM = 3;
+  private static final int MAX_RETAIN_CKP_NUM = 3;
 
   // the ckp metadata directory
   private static final String CKP_META = "ckp_meta";
@@ -99,6 +100,19 @@ public class CkpMetadata implements Serializable {
     fs.mkdirs(path);
   }
 
+  /**
+   * Resets the message bus, would clean all the messages.
+   *
+   * <p>This expects to be called by the driver.
+   */
+  public void reset() {
+    Iterator<String> itr = this.instantCache.iterator();
+    while (itr.hasNext()) {
+      cleanInstant(itr.next(), true);
+      itr.remove();
+    }
+  }
+
   public void startInstant(String instant) {
     Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
     try {
@@ -106,32 +120,47 @@ public class CkpMetadata implements Serializable {
     } catch (IOException e) {
       throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
     }
+    // cache the instant
+    cache(instant);
     // cleaning
-    clean(instant);
+    clean();
   }
 
-  private void clean(String newInstant) {
+  private void cache(String newInstant) {
     if (this.instantCache == null) {
       this.instantCache = new ArrayList<>();
     }
     this.instantCache.add(newInstant);
+  }
+
+  private void clean() {
     if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
-      final String instant = instantCache.get(0);
-      boolean[] error = new boolean[1];
-      CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
-        try {
-          fs.delete(path, false);
-        } catch (IOException e) {
-          error[0] = true;
-          LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
-        }
-      });
-      if (!error[0]) {
+      boolean success = cleanInstant(instantCache.get(0), false);
+      if (success) {
         instantCache.remove(0);
       }
     }
   }
 
+  private boolean cleanInstant(String instant, boolean throwsT) {
+    boolean success = true;
+    for (String fileName : CkpMessage.getAllFileNames(instant)) {
+      Path path = fullPath(fileName);
+      try {
+        fs.delete(path, false);
+      } catch (IOException ex) {
+        success = false;
+        final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
+        if (throwsT) {
+          throw new HoodieException(errMsg, ex);
+        } else {
+          LOG.warn(errMsg, ex);
+        }
+      }
+    }
+    return success;
+  }
+
   /**
    * Add a checkpoint commit message.
    *
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d5d35f7494..64bf8e2788 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
@@ -164,6 +165,14 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
   }
 
+  @Test
+  public void testSubTaskFailed() {
+    coordinator.subtaskFailed(0, null);
+    assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
+    CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
+    assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
+  }
+
   @Test
   public void testHiveSyncInvoked() throws Exception {
     // reset