You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/20 17:12:26 UTC
[hudi] 04/07: [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 89a27a1ccf4bdc32c60158aad3ca6e8a7068c206
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 20 12:48:24 2022 +0800
[HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 31 ++++++++++++++++++++++
.../hudi/sink/append/AppendWriteFunction.java | 2 +-
.../sink/common/AbstractStreamWriteFunction.java | 9 ++++++-
3 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index b5ec08a583..023b1e6965 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -42,6 +43,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -429,6 +431,31 @@ public class StreamWriteOperatorCoordinator
addEventToBuffer(event);
}
+ /**
+ * The coordinator reuses the instant if there is no data for this round of checkpoint,
+ * sends the commit ack events to unblock the flushing.
+ */
+ private void sendCommitAckEvents(long checkpointId) {
+ CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
+ .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
+ .toArray(CompletableFuture<?>[]::new);
+ CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
+ if (!sendToFinishedTasks(error)) {
+ throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
+ }
+ });
+ }
+
+ /**
+ * Decides whether the given exception is caused by sending events to FINISHED tasks.
+ *
+ * <p>Ugly impl: the exception may change in the future.
+ */
+ private static boolean sendToFinishedTasks(Throwable throwable) {
+ return throwable.getCause() instanceof TaskNotRunningException
+ || throwable.getCause().getMessage().contains("running");
+ }
+
/**
* Commits the instant.
*/
@@ -456,6 +483,10 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
+ // Send commit ack event to the write function to unblock the flushing
+ // If this checkpoint has no inputs while the next checkpoint has inputs,
+ // the 'isConfirming' flag should be switched with the ack event.
+ sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index a72b885a22..7b40718b35 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -123,9 +123,9 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
instant = this.writerHelper.getInstantTime();
} else {
- LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
instant = instantToWrite(false);
+ LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant);
}
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 4e8712b661..98085fa74f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -247,7 +247,7 @@ public abstract class AbstractStreamWriteFunction<I>
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
- if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) {
+ if (instant == null || invalidInstant(instant, hasData)) {
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
@@ -260,4 +260,11 @@ public abstract class AbstractStreamWriteFunction<I>
}
return instant;
}
+
+ /**
+ * Returns whether the pending instant is invalid to write with.
+ */
+ private boolean invalidInstant(String instant, boolean hasData) {
+ return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
+ }
}