You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/02 09:37:52 UTC
flink git commit: [hotfix] Add null check to
CheckpointCoordinator#discardState
Repository: flink
Updated Branches:
refs/heads/release-1.1 2bf87228e -> 0dc82baa0
[hotfix] Add null check to CheckpointCoordinator#discardState
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc82baa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc82baa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc82baa
Branch: refs/heads/release-1.1
Commit: 0dc82baa00169f83c120f3a4d661bb0042a6dca0
Parents: 2bf8722
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 2 09:46:00 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 2 10:37:36 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 22 ++++++++++----------
1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82baa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 74e6d08..592bafc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -695,7 +695,6 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
case SUCCESS:
-
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
@@ -1114,16 +1113,17 @@ public class CheckpointCoordinator {
}
private void discardState(final SerializedValue<StateHandle<?>> stateObject) {
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- stateObject.deserializeValue(userClassLoader).discardState();
- } catch (Exception e) {
- LOG.warn("Could not properly discard state object.", e);
+ if (stateObject != null) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ stateObject.deserializeValue(userClassLoader).discardState();
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard state object.", e);
+ }
}
- }
- });
+ });
+ }
}
}