You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/09 14:17:43 UTC
[1/2] flink git commit: [FLINK-8385] [checkpointing] Avoid
RejectedExecutionException in SharedStateRegistry during disposal from async
Zookeeper calls.
Repository: flink
Updated Branches:
refs/heads/master 40ba6261b -> b32b8359e
[FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls.
This closes #5256.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32b8359
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32b8359
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32b8359
Branch: refs/heads/master
Commit: b32b8359ea20812cddbcbffc3b617d1256889cb1
Parents: b5de38c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jan 8 11:31:57 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 15:17:34 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/state/SharedStateRegistry.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b32b8359/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 458c695..664631b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
/**
* This registry manages state that is shared across (incremental) checkpoints, and is responsible
@@ -194,8 +195,17 @@ public class SharedStateRegistry implements AutoCloseable {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
- asyncDisposalExecutor.execute(
- new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+ AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
+ try {
+ asyncDisposalExecutor.execute(asyncDisposalRunnable);
+ } catch (RejectedExecutionException ex) {
+ // TODO This is a temporary fix for a problem during ZooKeeperCompletedCheckpointStore#shutdown:
+ // Disposal is issued in another async thread and the shutdown proceeds to close the I/O Executor pool.
+ // This leads to RejectedExecutionException once the async deletes are triggered by ZK. We need to
+ // wait for all pending ZK deletes before closing the I/O Executor pool. We can simply call #run()
+ // because we are already in the async ZK thread that disposes the handles.
+ asyncDisposalRunnable.run();
+ }
}
}
[2/2] flink git commit: [FLINK-8385] [checkpointing] Suppress logging
of expected exception during snapshot cancellation.
Posted by sr...@apache.org.
[FLINK-8385] [checkpointing] Suppress logging of expected exception during snapshot cancellation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5de38ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5de38ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5de38ce
Branch: refs/heads/master
Commit: b5de38cee4e68304e76fd9f9cda09dee86099578
Parents: 40ba626
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jan 8 11:30:08 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 15:17:34 2018 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/state/StateUtil.java | 20 +++++++++++++++++---
.../api/operators/OperatorSnapshotResult.java | 4 +---
2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5de38ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 09d195a..f129c31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -21,6 +21,11 @@ package org.apache.flink.runtime.state;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.LambdaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
/**
@@ -28,6 +33,8 @@ import java.util.concurrent.RunnableFuture;
*/
public class StateUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(StateUtil.class);
+
private StateUtil() {
throw new AssertionError();
}
@@ -63,10 +70,17 @@ public class StateUtil {
public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
if (null != stateFuture) {
if (!stateFuture.cancel(true)) {
- StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
- if (null != stateObject) {
- stateObject.discardState();
+ try {
+ // We attempt to get a result, in case the future completed before cancellation.
+ StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ if (null != stateObject) {
+ stateObject.discardState();
+ }
+ } catch (CancellationException | ExecutionException ex) {
+ LOG.debug("Cancelled execution of snapshot future runnable. Cancellation produced the following " +
+ "exception, which is expected an can be ignored.", ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5de38ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 8aa76a5..8c05ae9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -88,9 +88,7 @@ public class OperatorSnapshotResult {
try {
StateUtil.discardStateFuture(getKeyedStateManagedFuture());
} catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(
- new Exception("Could not properly cancel managed keyed state future.", e),
- exception);
+ exception = new Exception("Could not properly cancel managed keyed state future.", e);
}
try {