You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/01/08 10:39:41 UTC
[GitHub] flink pull request #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWi...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/5256
[FLINK-8385] Fix exceptions in AbstractEventTimeWindowCheckpointingITCase
## What is the purpose of the change
This change fixes two types of exceptions that I found when running `AbstractEventTimeWindowCheckpointingITCase` in debug mode (see JIRA).
## Brief change log
- *As a temporary fix for the first mentioned exception, we catch the `RejectedExecutionException` and execute the `Runnable` directly.*
- *Suppress expected `CancellationException` or `ExecutionException` when closing snapshot future task.*
## Verifying this change
This change is already covered by `AbstractEventTimeWindowCheckpointingITCase`. You should no longer see the mentioned exceptions when running all concrete instances of the test.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink WIP
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5256.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5256
----
commit 72ee09e86205f2fc83f5247bf15c861b94af73cc
Author: Stefan Richter <s....@...>
Date: 2018-01-08T10:30:08Z
[FLINK-8385] [checkpointing] Suppress logging of expected exception during snapshot cancellation.
commit e6f4c404c633d844c80341cc288b6bd4b89d8705
Author: Stefan Richter <s....@...>
Date: 2018-01-08T10:31:57Z
[FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls.
----
---
[GitHub] flink issue #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWindowChe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5256
CC @tillrohrmann
---
[GitHub] flink pull request #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWi...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5256#discussion_r160200816
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---
@@ -63,10 +65,16 @@ public static void bestEffortDiscardAllStateObjects(
public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
if (null != stateFuture) {
if (!stateFuture.cancel(true)) {
- StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
- if (null != stateObject) {
- stateObject.discardState();
+ try {
+ // We attempt to get a result, in case the future completed before cancellation.
+ StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ if (null != stateObject) {
+ stateObject.discardState();
+ }
+ } catch (CancellationException | ExecutionException ignore) {
+ // No result that requires discarding was produced.
--- End diff --
Maybe we should log on DEBUG that the state future could not be completed.
---
[GitHub] flink pull request #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWi...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5256#discussion_r160200089
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---
@@ -194,8 +195,17 @@ private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
- asyncDisposalExecutor.execute(
- new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+ AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
+ try {
+ asyncDisposalExecutor.execute(asyncDisposalRunnable);
+ } catch (RejectedExecutionException ex) {
+ // TODO This is a temporary fix for a problem when the ZooKeeperCompletedCheckpointStore on shutdown:
--- End diff --
typo: on --> is
---
[GitHub] flink pull request #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWi...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5256
---
[GitHub] flink pull request #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWi...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5256#discussion_r160403824
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---
@@ -63,10 +65,16 @@ public static void bestEffortDiscardAllStateObjects(
public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
if (null != stateFuture) {
if (!stateFuture.cancel(true)) {
- StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
- if (null != stateObject) {
- stateObject.discardState();
+ try {
+ // We attempt to get a result, in case the future completed before cancellation.
+ StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ if (null != stateObject) {
+ stateObject.discardState();
+ }
+ } catch (CancellationException | ExecutionException ignore) {
+ // No result that requires discarding was produced.
--- End diff --
Ok, but I wonder if we should include the exception to the log entry, because I want to avoid presenting something to the user that might look like a problem/bug.
---
[GitHub] flink issue #5256: [FLINK-8385] Fix exceptions in AbstractEventTimeWindowChe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5256
Thanks for the fast review @tillrohrmann ! I will address your comments and merge.
---