You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/09 10:17:37 UTC

[1/2] flink git commit: [hotfix] Properly Await Termination in SavepointITCase

Repository: flink
Updated Branches:
  refs/heads/master 07ab9f453 -> ccf35cf20


[hotfix] Properly Await Termination in SavepointITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf35cf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf35cf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf35cf2

Branch: refs/heads/master
Commit: ccf35cf203f349dd126e641eff746370ec04403f
Parents: 1440682
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 9 11:15:23 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 11:17:05 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/test/checkpointing/SavepointITCase.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf35cf2/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 87cf80f..47a0828 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -232,6 +232,7 @@ public class SavepointITCase extends TestLogger {
 			// Shut down the Flink cluster (thereby canceling the job)
 			LOG.info("Shutting down Flink cluster.");
 			flink.shutdown();
+			flink.awaitTermination();
 
 			// - Verification START -------------------------------------------
 
@@ -339,6 +340,7 @@ public class SavepointITCase extends TestLogger {
 					SubtaskState subtaskState = taskState.getState(tdd.getIndexInSubtaskGroup());
 
 					assertNotNull(subtaskState);
+
 					errMsg = "Initial operator state mismatch.";
 					assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
 							tdd.getTaskStateHandles().getLegacyOperatorState());


[2/2] flink git commit: [FLINK-5019] Proper isRestored result for tasks that did not write state

Posted by al...@apache.org.
[FLINK-5019] Proper isRestored result for tasks that did not write state


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14406821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14406821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14406821

Branch: refs/heads/master
Commit: 1440682136e84651f8f0d1230d038880588f6cb3
Parents: 07ab9f4
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 3 11:34:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 11:17:05 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/PendingCheckpoint.java   |   4 +-
 .../checkpoint/StateAssignmentOperation.java    | 253 ++++++++++---------
 .../flink/runtime/checkpoint/SubtaskState.java  |   7 -
 .../flink/runtime/state/TaskStateHandles.java   |  11 +-
 .../streaming/runtime/tasks/StreamTask.java     |   6 +-
 5 files changed, 132 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 92dca21..43a2557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -245,12 +245,10 @@ public class PendingCheckpoint {
 				return false;
 			}
 
-			if (null != checkpointedSubtaskState && checkpointedSubtaskState.hasState()) {
+			if (null != checkpointedSubtaskState) {
 
 				JobVertexID jobVertexID = vertex.getJobvertexId();
-
 				int subtaskIndex = vertex.getParallelSubtaskIndex();
-
 				TaskState taskState = taskStates.get(jobVertexID);
 
 				if (null == taskState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index d98c8e8..f496a07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -63,164 +63,169 @@ public class StateAssignmentOperation {
 	public boolean assignStates() throws Exception {
 
 		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
+
 			TaskState taskState = taskGroupStateEntry.getValue();
 			ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey());
 
-			if (executionJobVertex != null) {
-				// check that the number of key groups have not changed
-				if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
-					throw new IllegalStateException("The maximum parallelism (" +
-							taskState.getMaxParallelism() + ") with which the latest " +
-							"checkpoint of the execution job vertex " + executionJobVertex +
-							" has been taken and the current maximum parallelism (" +
-							executionJobVertex.getMaxParallelism() + ") changed. This " +
-							"is currently not supported.");
+			if (executionJobVertex == null) {
+				if (allowNonRestoredState) {
+					logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
+					continue;
+				} else {
+					throw new IllegalStateException("There is no execution job vertex for the job" +
+							" vertex ID " + taskGroupStateEntry.getKey());
 				}
+			}
 
-				final int oldParallelism = taskState.getParallelism();
-				final int newParallelism = executionJobVertex.getParallelism();
-				final boolean parallelismChanged = oldParallelism != newParallelism;
-				final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
-
-				if (hasNonPartitionedState && parallelismChanged) {
-					throw new IllegalStateException("Cannot restore the latest checkpoint because " +
-							"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
-							"state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-							" has parallelism " + newParallelism + " whereas the corresponding" +
-							"state object has a parallelism of " + oldParallelism);
-				}
+			// check that the number of key groups have not changed
+			if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+				throw new IllegalStateException("The maximum parallelism (" +
+						taskState.getMaxParallelism() + ") with which the latest " +
+						"checkpoint of the execution job vertex " + executionJobVertex +
+						" has been taken and the current maximum parallelism (" +
+						executionJobVertex.getMaxParallelism() + ") changed. This " +
+						"is currently not supported.");
+			}
 
-				List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
-						executionJobVertex.getMaxParallelism(),
-						newParallelism);
+			//-------------------------------------------------------------------
 
-				final int chainLength = taskState.getChainLength();
+			final int oldParallelism = taskState.getParallelism();
+			final int newParallelism = executionJobVertex.getParallelism();
+			final boolean parallelismChanged = oldParallelism != newParallelism;
+			final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
 
-				// operator chain idx -> list of the stored op states from all parallel instances for this chain idx
-				@SuppressWarnings("unchecked")
-				List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
-				@SuppressWarnings("unchecked")
-				List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
+			if (hasNonPartitionedState && parallelismChanged) {
+				throw new IllegalStateException("Cannot restore the latest checkpoint because " +
+						"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
+						"state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
+						" has parallelism " + newParallelism + " whereas the corresponding" +
+						"state object has a parallelism of " + oldParallelism);
+			}
 
-				List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
-				List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
+			List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
+					executionJobVertex.getMaxParallelism(),
+					newParallelism);
+
+			final int chainLength = taskState.getChainLength();
 
-				for (int p = 0; p < oldParallelism; ++p) {
-					SubtaskState subtaskState = taskState.getState(p);
+			// operator chain idx -> list of the stored op states from all parallel instances for this chain idx
+			@SuppressWarnings("unchecked")
+			List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
+			@SuppressWarnings("unchecked")
+			List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
 
-					if (null != subtaskState) {
-						collectParallelStatesByChainOperator(
-								parallelOpStatesBackend, subtaskState.getManagedOperatorState());
+			List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
+			List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
 
-						collectParallelStatesByChainOperator(
-								parallelOpStatesStream, subtaskState.getRawOperatorState());
+			for (int p = 0; p < oldParallelism; ++p) {
+				SubtaskState subtaskState = taskState.getState(p);
 
-						KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
-						if (null != keyedStateBackend) {
-							parallelKeyedStatesBackend.add(keyedStateBackend);
-						}
+				if (null != subtaskState) {
+					collectParallelStatesByChainOperator(
+							parallelOpStatesBackend, subtaskState.getManagedOperatorState());
 
-						KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
-						if (null != keyedStateStream) {
-							parallelKeyedStateStream.add(keyedStateStream);
-						}
+					collectParallelStatesByChainOperator(
+							parallelOpStatesStream, subtaskState.getRawOperatorState());
+
+					KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+					if (null != keyedStateBackend) {
+						parallelKeyedStatesBackend.add(keyedStateBackend);
 					}
-				}
 
-				// operator chain index -> lists with collected states (one collection for each parallel subtasks)
-				@SuppressWarnings("unchecked")
-				List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
+					KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+					if (null != keyedStateStream) {
+						parallelKeyedStateStream.add(keyedStateStream);
+					}
+				}
+			}
 
-				@SuppressWarnings("unchecked")
-				List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
+			// operator chain index -> lists with collected states (one collection for each parallel subtasks)
+			@SuppressWarnings("unchecked")
+			List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
 
-				//TODO here we can employ different redistribution strategies for state, e.g. union state.
-				// For now we only offer round robin as the default.
-				OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+			@SuppressWarnings("unchecked")
+			List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
 
-				for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
+			//TODO here we can employ different redistribution strategies for state, e.g. union state.
+			// For now we only offer round robin as the default.
+			OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 
-					List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
-					List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
+			for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
 
-					partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
-							opStateRepartitioner,
-							chainOpParallelStatesBackend,
-							oldParallelism,
-							newParallelism);
+				List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
+				List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
 
-					partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
-							opStateRepartitioner,
-							chainOpParallelStatesStream,
-							oldParallelism,
-							newParallelism);
-				}
+				partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
+						opStateRepartitioner,
+						chainOpParallelStatesBackend,
+						oldParallelism,
+						newParallelism);
 
-				for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
-					// non-partitioned state
-					ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
+				partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
+						opStateRepartitioner,
+						chainOpParallelStatesStream,
+						oldParallelism,
+						newParallelism);
+			}
 
-					if (hasNonPartitionedState) {
-						// count the number of executions for which we set a state
-						nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
-					}
+			for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
+				// non-partitioned state
+				ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
 
-					// partitionable state
-					@SuppressWarnings("unchecked")
-					Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
-					@SuppressWarnings("unchecked")
-					Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
-					List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
-					List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
+				if (!parallelismChanged) {
+					nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+				}
 
-					for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
-						List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
-								partitionedParallelStatesBackend[chainIdx];
+				// partitionable state
+				@SuppressWarnings("unchecked")
+				Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
+				@SuppressWarnings("unchecked")
+				Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
+				List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
+				List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
 
-						List<Collection<OperatorStateHandle>> redistributedOpStateStream =
-								partitionedParallelStatesStream[chainIdx];
+				for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
+					List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
+							partitionedParallelStatesBackend[chainIdx];
 
-						if (redistributedOpStateBackend != null) {
-							operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
-						}
+					List<Collection<OperatorStateHandle>> redistributedOpStateStream =
+							partitionedParallelStatesStream[chainIdx];
 
-						if (redistributedOpStateStream != null) {
-							operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
-						}
+					if (redistributedOpStateBackend != null) {
+						operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
 					}
 
-					Execution currentExecutionAttempt = executionJobVertex
-							.getTaskVertices()[subTaskIdx]
-							.getCurrentExecutionAttempt();
-
-					List<KeyGroupsStateHandle> newKeyedStatesBackend;
-					List<KeyGroupsStateHandle> newKeyedStateStream;
-					if (parallelismChanged) {
-						KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
-						newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
-						newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
-					} else {
-						SubtaskState subtaskState = taskState.getState(subTaskIdx);
-						KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
-						KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
-						newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
-						newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
+					if (redistributedOpStateStream != null) {
+						operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
 					}
+				}
 
-					TaskStateHandles taskStateHandles = new TaskStateHandles(
-							nonPartitionableState,
-							operatorStateFromBackend,
-							operatorStateFromStream,
-							newKeyedStatesBackend,
-							newKeyedStateStream);
-
-					currentExecutionAttempt.setInitialState(taskStateHandles);
+				Execution currentExecutionAttempt = executionJobVertex
+						.getTaskVertices()[subTaskIdx]
+						.getCurrentExecutionAttempt();
+
+				List<KeyGroupsStateHandle> newKeyedStatesBackend;
+				List<KeyGroupsStateHandle> newKeyedStateStream;
+				if (parallelismChanged) {
+					KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
+					newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
+					newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+				} else {
+					SubtaskState subtaskState = taskState.getState(subTaskIdx);
+					KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+					KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+					newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
+					newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
 				}
-			} else if (allowNonRestoredState) {
-				logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
-			} else {
-				throw new IllegalStateException("There is no execution job vertex for the job" +
-						" vertex ID " + taskGroupStateEntry.getKey());
+
+				TaskStateHandles taskStateHandles = new TaskStateHandles(
+						nonPartitionableState,
+						operatorStateFromBackend,
+						operatorStateFromStream,
+						newKeyedStatesBackend,
+						newKeyedStateStream);
+
+				currentExecutionAttempt.setInitialState(taskStateHandles);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 9b9a810..ca51e1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -212,13 +212,6 @@ public class SubtaskState implements StateObject {
 
 	}
 
-	public boolean hasState() {
-		return (null != legacyOperatorState && !legacyOperatorState.isEmpty())
-				|| (null != managedOperatorState && !managedOperatorState.isEmpty())
-				|| null != managedKeyedState
-				|| null != rawKeyedState;
-	}
-
 	@Override
 	public int hashCode() {
 		int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
index ecd6399..417a9dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.util.CollectionUtil;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -99,14 +98,6 @@ public class TaskStateHandles implements Serializable {
 		return managedOperatorState;
 	}
 
-	public boolean hasState() {
-		return !ChainedStateHandle.isNullOrEmpty(legacyOperatorState)
-				|| !CollectionUtil.isNullOrEmpty(managedKeyedState)
-				|| !CollectionUtil.isNullOrEmpty(rawKeyedState)
-				|| !CollectionUtil.isNullOrEmpty(rawOperatorState)
-				|| !CollectionUtil.isNullOrEmpty(managedOperatorState);
-	}
-
 	private static List<Collection<OperatorStateHandle>> transform(ChainedStateHandle<OperatorStateHandle> in) {
 		if (null == in) {
 			return Collections.emptyList();
@@ -169,4 +160,4 @@ public class TaskStateHandles implements Serializable {
 		result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0);
 		return result;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bd34044..83d72e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -918,11 +918,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						keyedStateHandleBackend,
 						keyedStateHandleStream);
 
-				if (subtaskState.hasState()) {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
-				} else {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
-				}
+				owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",