You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/08/28 08:52:15 UTC

[flink] branch master updated (2d4e801 -> 378115f)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2d4e801  [hotfix][docs] Fix links and typoes in new Source API docs
     new 11f658e  [hotfix] Refactor SubtaskCheckpointCoordinatorImpl.registerAsyncCheckpointRunnable
     new 378115f  [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/tasks/AsyncCheckpointRunnable.java     |  4 ++++
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 28 ++++++++++++----------
 2 files changed, 19 insertions(+), 13 deletions(-)


[flink] 02/02: [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 378115ffd1642c782c7d0f203c5c46e07d18fc23
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Aug 27 19:59:38 2020 +0200

    [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception
    
    Currently, SubtaskCheckpointCoordinatorImpl closes all runnables on close.
    It doesn't stop the actual threads, however. When closed runnable starts,
    it sees its parent is closed and throws an exception.
    This causes end-to-end tests failures.
    
    This change adds a check of runnable state.
---
 .../flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java    | 4 ++++
 .../streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 8 +++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index ef84f69..c3eb9ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -49,6 +49,10 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 	private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
 	private final Environment taskEnvironment;
 
+	public boolean isRunning() {
+		return asyncCheckpointState.get() == AsyncCheckpointState.RUNNING;
+	}
+
 	enum AsyncCheckpointState {
 		RUNNING,
 		DISCARDED,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 42b4acc..45650ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -70,6 +70,7 @@ import java.util.function.Supplier;
 
 import static org.apache.flink.util.IOUtils.closeQuietly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 
@@ -363,8 +364,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 	private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
 		synchronized (lock) {
 			if (closed) {
+				LOG.debug("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
+				final boolean running = asyncCheckpointRunnable.isRunning();
 				closeQuietly(asyncCheckpointRunnable);
-				throw new IOException("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
+				checkState(
+					!running,
+					"SubtaskCheckpointCoordinatorImpl was closed without closing asyncCheckpointRunnable %s",
+					checkpointId);
 			} else if (checkpoints.containsKey(checkpointId)) {
 				closeQuietly(asyncCheckpointRunnable);
 				throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", checkpointId));


[flink] 01/02: [hotfix] Refactor SubtaskCheckpointCoordinatorImpl.registerAsyncCheckpointRunnable

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11f658e172a5a2dc750f9d9e7db9c5f9ead561c3
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Aug 27 19:57:18 2020 +0200

    [hotfix] Refactor SubtaskCheckpointCoordinatorImpl.registerAsyncCheckpointRunnable
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 22 +++++++++-------------
 1 file changed, 9 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 0d6d638..42b4acc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -68,6 +68,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import static org.apache.flink.util.IOUtils.closeQuietly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
@@ -360,22 +361,17 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 	}
 
 	private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
-		StringBuilder exceptionMessage = new StringBuilder("Cannot register Closeable, ");
 		synchronized (lock) {
-			if (!closed) {
-				if (!checkpoints.containsKey(checkpointId)) {
-					checkpoints.put(checkpointId, asyncCheckpointRunnable);
-					return;
-				} else {
-					exceptionMessage.append("async checkpoint ").append(checkpointId).append(" runnable has been register. ");
-				}
+			if (closed) {
+				closeQuietly(asyncCheckpointRunnable);
+				throw new IOException("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
+			} else if (checkpoints.containsKey(checkpointId)) {
+				closeQuietly(asyncCheckpointRunnable);
+				throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", checkpointId));
 			} else {
-				exceptionMessage.append("this subtaskCheckpointCoordinator is already closed. ");
+				checkpoints.put(checkpointId, asyncCheckpointRunnable);
 			}
 		}
-
-		IOUtils.closeQuietly(asyncCheckpointRunnable);
-		throw new IOException(exceptionMessage.append("Closing argument.").toString());
 	}
 
 	private boolean unregisterAsyncCheckpointRunnable(long checkpointId) {
@@ -393,7 +389,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 		synchronized (lock) {
 			asyncCheckpointRunnable = checkpoints.remove(checkpointId);
 		}
-		IOUtils.closeQuietly(asyncCheckpointRunnable);
+		closeQuietly(asyncCheckpointRunnable);
 		return asyncCheckpointRunnable != null;
 	}