You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/11/10 14:13:55 UTC
[flink] branch release-1.14 updated: [FLINK-24667][runtime] Fix
error handling in ChannelStateCheckpointWriter
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 2ba57e2 [FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
2ba57e2 is described below
commit 2ba57e29b8c7cc7d48f5313f0aeb96960c0796f6
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Oct 29 09:31:10 2021 +0200
[FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
- Don't propagate single checkpoint failure to prevent exiting from
ChannelStateWriteRequestExecutorImpl loop
- Handle error during closing the stream
- Swap suppressed exceptions to prevent them from piling up in the
stacktrace
---
.../channel/ChannelStateCheckpointWriter.java | 34 ++++++++++++++--------
.../channel/ChannelStateWriteRequest.java | 5 ++--
.../ChannelStateWriteRequestExecutorImpl.java | 7 +++--
3 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index 368bb04..f84cf50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -51,6 +51,8 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -99,8 +101,7 @@ class ChannelStateCheckpointWriter {
ChannelStateWriteResult result,
CheckpointStateOutputStream stream,
ChannelStateSerializer serializer,
- RunnableWithException onComplete)
- throws Exception {
+ RunnableWithException onComplete) {
this(
taskName,
subtaskIndex,
@@ -121,8 +122,7 @@ class ChannelStateCheckpointWriter {
ChannelStateSerializer serializer,
RunnableWithException onComplete,
CheckpointStateOutputStream checkpointStateOutputStream,
- DataOutputStream dataStream)
- throws Exception {
+ DataOutputStream dataStream) {
this.taskName = taskName;
this.subtaskIndex = subtaskIndex;
this.checkpointId = checkpointId;
@@ -134,7 +134,7 @@ class ChannelStateCheckpointWriter {
runWithChecks(() -> serializer.writeHeader(dataStream));
}
- void writeInput(InputChannelInfo info, Buffer buffer) throws Exception {
+ void writeInput(InputChannelInfo info, Buffer buffer) {
write(
inputChannelOffsets,
info,
@@ -143,7 +143,7 @@ class ChannelStateCheckpointWriter {
"ChannelStateCheckpointWriter#writeInput");
}
- void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws Exception {
+ void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
write(
resultSubpartitionOffsets,
info,
@@ -157,8 +157,7 @@ class ChannelStateCheckpointWriter {
K key,
Buffer buffer,
boolean precondition,
- String action)
- throws Exception {
+ String action) {
try {
if (result.isDone()) {
return;
@@ -290,19 +289,30 @@ class ChannelStateCheckpointWriter {
}
}
- private void runWithChecks(RunnableWithException r) throws Exception {
+ private void runWithChecks(RunnableWithException r) {
try {
checkState(!result.isDone(), "result is already completed", result);
r.run();
} catch (Exception e) {
fail(e);
- throw e;
+ if (!findThrowable(e, IOException.class).isPresent()) {
+ rethrow(e);
+ }
}
}
- public void fail(Throwable e) throws Exception {
+ public void fail(Throwable e) {
result.fail(e);
- checkpointStream.close();
+ try {
+ checkpointStream.close();
+ } catch (Exception closeException) {
+ String message = "Unable to close checkpointStream after a failure";
+ if (findThrowable(closeException, IOException.class).isPresent()) {
+ LOG.warn(message, closeException);
+ } else {
+ throw new RuntimeException(message, closeException);
+ }
+ }
}
private interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index 17be93b..d8d1923 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -22,10 +22,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
@@ -76,8 +76,7 @@ interface ChannelStateWriteRequest {
long checkpointId,
String name,
CloseableIterator<Buffer> iterator,
- BiConsumerWithException<ChannelStateCheckpointWriter, Buffer, Exception>
- bufferConsumer) {
+ BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
return new CheckpointInProgressRequest(
name,
checkpointId,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index 08fb2d5..5c12a77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -149,8 +149,11 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
// checking before is not enough because (check + enqueue) is not atomic
if (wasClosed || !thread.isAlive()) {
cleanupRequests();
- throw ExceptionUtils.firstOrSuppressed(
- new IllegalStateException("not running"), thrown);
+ IllegalStateException exception = new IllegalStateException("not running");
+ if (thrown != null) {
+ exception.addSuppressed(thrown);
+ }
+ throw exception;
}
}