You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/20 17:12:26 UTC

[hudi] 04/07: [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 89a27a1ccf4bdc32c60158aad3ca6e8a7068c206
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 20 12:48:24 2022 +0800

    [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 31 ++++++++++++++++++++++
 .../hudi/sink/append/AppendWriteFunction.java      |  2 +-
 .../sink/common/AbstractStreamWriteFunction.java   |  9 ++++++-
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index b5ec08a583..023b1e6965 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -42,6 +43,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -429,6 +431,31 @@ public class StreamWriteOperatorCoordinator
     addEventToBuffer(event);
   }
 
+  /**
+   * The coordinator reuses the instant if there is no data for this round of checkpoint,
+   * sends the commit ack events to unblock the flushing.
+   */
+  private void sendCommitAckEvents(long checkpointId) {
+    CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
+        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
+        .toArray(CompletableFuture<?>[]::new);
+    CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
+      if (!sendToFinishedTasks(error)) {
+        throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
+      }
+    });
+  }
+
+  /**
+   * Decides whether the given exception is caused by sending events to FINISHED tasks.
+   *
+   * <p>Ugly impl: the exception may change in the future.
+   */
+  private static boolean sendToFinishedTasks(Throwable throwable) {
+    return throwable.getCause() instanceof TaskNotRunningException
+        || throwable.getCause().getMessage().contains("running");
+  }
+
   /**
    * Commits the instant.
    */
@@ -456,6 +483,10 @@ public class StreamWriteOperatorCoordinator
     if (writeResults.size() == 0) {
       // No data has written, reset the buffer and returns early
       reset();
+      // Send commit ack event to the write function to unblock the flushing
+      // If this checkpoint has no inputs while the next checkpoint has inputs,
+      // the 'isConfirming' flag should be switched with the ack event.
+      sendCommitAckEvents(checkpointId);
       return false;
     }
     doCommit(instant, writeResults);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index a72b885a22..7b40718b35 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -123,9 +123,9 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
       writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
       instant = this.writerHelper.getInstantTime();
     } else {
-      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
       writeStatus = Collections.emptyList();
       instant = instantToWrite(false);
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant);
     }
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 4e8712b661..98085fa74f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -247,7 +247,7 @@ public abstract class AbstractStreamWriteFunction<I>
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change and the checkpoint has buffering data
-      if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) {
+      if (instant == null || invalidInstant(instant, hasData)) {
         // sleep for a while
         timeWait.waitFor();
         // refresh the inflight instant
@@ -260,4 +260,11 @@ public abstract class AbstractStreamWriteFunction<I>
     }
     return instant;
   }
+
+  /**
+   * Returns whether the pending instant is invalid to write with.
+   */
+  private boolean invalidInstant(String instant, boolean hasData) {
+    return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
+  }
 }