You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/09 14:17:43 UTC

[1/2] flink git commit: [FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls.

Repository: flink
Updated Branches:
  refs/heads/master 40ba6261b -> b32b8359e


[FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls.

This closes #5256.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32b8359
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32b8359
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32b8359

Branch: refs/heads/master
Commit: b32b8359ea20812cddbcbffc3b617d1256889cb1
Parents: b5de38c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jan 8 11:31:57 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 15:17:34 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/SharedStateRegistry.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b32b8359/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 458c695..664631b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * This registry manages state that is shared across (incremental) checkpoints, and is responsible
@@ -194,8 +195,17 @@ public class SharedStateRegistry implements AutoCloseable {
 		// We do the small optimization to not issue discards for placeholders, which are NOPs.
 		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
 			LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
-			asyncDisposalExecutor.execute(
-				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+			AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
+			try {
+				asyncDisposalExecutor.execute(asyncDisposalRunnable);
+			} catch (RejectedExecutionException ex) {
+				// TODO This is a temporary fix for a problem during ZooKeeperCompletedCheckpointStore#shutdown:
+				// Disposal is issued in another async thread and the shutdown proceeds to close the I/O Executor pool.
+				// This leads to RejectedExecutionException once the async deletes are triggered by ZK. We need to
+				// wait for all pending ZK deletes before closing the I/O Executor pool. We can simply call #run()
+				// because we are already in the async ZK thread that disposes the handles.
+				asyncDisposalRunnable.run();
+			}
 		}
 	}
 


[2/2] flink git commit: [FLINK-8385] [checkpointing] Suppress logging of expected exception during snapshot cancellation.

Posted by sr...@apache.org.
[FLINK-8385] [checkpointing] Suppress logging of expected exception during snapshot cancellation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5de38ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5de38ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5de38ce

Branch: refs/heads/master
Commit: b5de38cee4e68304e76fd9f9cda09dee86099578
Parents: 40ba626
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jan 8 11:30:08 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 15:17:34 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/state/StateUtil.java   | 20 +++++++++++++++++---
 .../api/operators/OperatorSnapshotResult.java   |  4 +---
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5de38ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 09d195a..f129c31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -21,6 +21,11 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.LambdaUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -28,6 +33,8 @@ import java.util.concurrent.RunnableFuture;
  */
 public class StateUtil {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StateUtil.class);
+
 	private StateUtil() {
 		throw new AssertionError();
 	}
@@ -63,10 +70,17 @@ public class StateUtil {
 	public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
 		if (null != stateFuture) {
 			if (!stateFuture.cancel(true)) {
-				StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
 
-				if (null != stateObject) {
-					stateObject.discardState();
+				try {
+					// We attempt to get a result, in case the future completed before cancellation.
+					StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+					if (null != stateObject) {
+						stateObject.discardState();
+					}
+				} catch (CancellationException | ExecutionException ex) {
+					LOG.debug("Cancelled execution of snapshot future runnable. Cancellation produced the following " +
+						"exception, which is expected an can be ignored.", ex);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5de38ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 8aa76a5..8c05ae9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -88,9 +88,7 @@ public class OperatorSnapshotResult {
 		try {
 			StateUtil.discardStateFuture(getKeyedStateManagedFuture());
 		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(
-				new Exception("Could not properly cancel managed keyed state future.", e),
-				exception);
+			exception = new Exception("Could not properly cancel managed keyed state future.", e);
 		}
 
 		try {