You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/02 09:37:52 UTC

flink git commit: [hotfix] Add null check to CheckpointCoordinator#discardState

Repository: flink
Updated Branches:
  refs/heads/release-1.1 2bf87228e -> 0dc82baa0


[hotfix] Add null check to CheckpointCoordinator#discardState


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc82baa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc82baa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc82baa

Branch: refs/heads/release-1.1
Commit: 0dc82baa00169f83c120f3a4d661bb0042a6dca0
Parents: 2bf8722
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 2 09:46:00 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 2 10:37:36 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dc82baa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 74e6d08..592bafc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -695,7 +695,6 @@ public class CheckpointCoordinator {
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
 					case SUCCESS:
-
 						if (checkpoint.isFullyAcknowledged()) {
 							completePendingCheckpoint(checkpoint);
 							
@@ -1114,16 +1113,17 @@ public class CheckpointCoordinator {
 	}
 
 	private void discardState(final SerializedValue<StateHandle<?>> stateObject) {
-
-		executor.execute(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					stateObject.deserializeValue(userClassLoader).discardState();
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard state object.", e);
+		if (stateObject != null) {
+			executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						stateObject.deserializeValue(userClassLoader).discardState();
+					} catch (Exception e) {
+						LOG.warn("Could not properly discard state object.", e);
+					}
 				}
-			}
-		});
+			});
+		}
 	}
 }