You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/11/10 14:13:55 UTC

[flink] branch release-1.14 updated: [FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 2ba57e2  [FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
2ba57e2 is described below

commit 2ba57e29b8c7cc7d48f5313f0aeb96960c0796f6
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Oct 29 09:31:10 2021 +0200

    [FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
    
    - Don't propagate single checkpoint failure to prevent exiting from
      ChannelStateWriteRequestExecutorImpl loop
    - Handle error during closing the stream
    - Swap suppressed exceptions to prevent them from piling up in the
      stacktrace
---
 .../channel/ChannelStateCheckpointWriter.java      | 34 ++++++++++++++--------
 .../channel/ChannelStateWriteRequest.java          |  5 ++--
 .../ChannelStateWriteRequestExecutorImpl.java      |  7 +++--
 3 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index 368bb04..f84cf50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -51,6 +51,8 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static java.util.UUID.randomUUID;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -99,8 +101,7 @@ class ChannelStateCheckpointWriter {
             ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
-            RunnableWithException onComplete)
-            throws Exception {
+            RunnableWithException onComplete) {
         this(
                 taskName,
                 subtaskIndex,
@@ -121,8 +122,7 @@ class ChannelStateCheckpointWriter {
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
-            DataOutputStream dataStream)
-            throws Exception {
+            DataOutputStream dataStream) {
         this.taskName = taskName;
         this.subtaskIndex = subtaskIndex;
         this.checkpointId = checkpointId;
@@ -134,7 +134,7 @@ class ChannelStateCheckpointWriter {
         runWithChecks(() -> serializer.writeHeader(dataStream));
     }
 
-    void writeInput(InputChannelInfo info, Buffer buffer) throws Exception {
+    void writeInput(InputChannelInfo info, Buffer buffer) {
         write(
                 inputChannelOffsets,
                 info,
@@ -143,7 +143,7 @@ class ChannelStateCheckpointWriter {
                 "ChannelStateCheckpointWriter#writeInput");
     }
 
-    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws Exception {
+    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
         write(
                 resultSubpartitionOffsets,
                 info,
@@ -157,8 +157,7 @@ class ChannelStateCheckpointWriter {
             K key,
             Buffer buffer,
             boolean precondition,
-            String action)
-            throws Exception {
+            String action) {
         try {
             if (result.isDone()) {
                 return;
@@ -290,19 +289,30 @@ class ChannelStateCheckpointWriter {
         }
     }
 
-    private void runWithChecks(RunnableWithException r) throws Exception {
+    private void runWithChecks(RunnableWithException r) {
         try {
             checkState(!result.isDone(), "result is already completed", result);
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
+            if (!findThrowable(e, IOException.class).isPresent()) {
+                rethrow(e);
+            }
         }
     }
 
-    public void fail(Throwable e) throws Exception {
+    public void fail(Throwable e) {
         result.fail(e);
-        checkpointStream.close();
+        try {
+            checkpointStream.close();
+        } catch (Exception closeException) {
+            String message = "Unable to close checkpointStream after a failure";
+            if (findThrowable(closeException, IOException.class).isPresent()) {
+                LOG.warn(message, closeException);
+            } else {
+                throw new RuntimeException(message, closeException);
+            }
+        }
     }
 
     private interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index 17be93b..d8d1923 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -22,10 +22,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
 import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
@@ -76,8 +76,7 @@ interface ChannelStateWriteRequest {
             long checkpointId,
             String name,
             CloseableIterator<Buffer> iterator,
-            BiConsumerWithException<ChannelStateCheckpointWriter, Buffer, Exception>
-                    bufferConsumer) {
+            BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
         return new CheckpointInProgressRequest(
                 name,
                 checkpointId,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index 08fb2d5..5c12a77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -149,8 +149,11 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
         // checking before is not enough because (check + enqueue) is not atomic
         if (wasClosed || !thread.isAlive()) {
             cleanupRequests();
-            throw ExceptionUtils.firstOrSuppressed(
-                    new IllegalStateException("not running"), thrown);
+            IllegalStateException exception = new IllegalStateException("not running");
+            if (thrown != null) {
+                exception.addSuppressed(thrown);
+            }
+            throw exception;
         }
     }