You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/28 08:52:17 UTC

[flink] 02/02: [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 378115ffd1642c782c7d0f203c5c46e07d18fc23
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Aug 27 19:59:38 2020 +0200

    [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception
    
    Currently, SubtaskCheckpointCoordinatorImpl closes all runnables on close.
    It doesn't stop the actual threads, however. When closed runnable starts,
    it sees its parent is closed and throws an exception.
    This causes end-to-end tests failures.
    
    This change adds a check of runnable state.
---
 .../flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java    | 4 ++++
 .../streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 8 +++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index ef84f69..c3eb9ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -49,6 +49,10 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 	private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
 	private final Environment taskEnvironment;
 
+	public boolean isRunning() {
+		return asyncCheckpointState.get() == AsyncCheckpointState.RUNNING;
+	}
+
 	enum AsyncCheckpointState {
 		RUNNING,
 		DISCARDED,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 42b4acc..45650ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -70,6 +70,7 @@ import java.util.function.Supplier;
 
 import static org.apache.flink.util.IOUtils.closeQuietly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 
@@ -363,8 +364,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 	private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
 		synchronized (lock) {
 			if (closed) {
+				LOG.debug("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
+				final boolean running = asyncCheckpointRunnable.isRunning();
 				closeQuietly(asyncCheckpointRunnable);
-				throw new IOException("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
+				checkState(
+					!running,
+					"SubtaskCheckpointCoordinatorImpl was closed without closing asyncCheckpointRunnable %s",
+					checkpointId);
 			} else if (checkpoints.containsKey(checkpointId)) {
 				closeQuietly(asyncCheckpointRunnable);
 				throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", checkpointId));