You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/09 10:17:37 UTC
[1/2] flink git commit: [hotfix] Properly Await Termination in
SavepointITCase
Repository: flink
Updated Branches:
refs/heads/master 07ab9f453 -> ccf35cf20
[hotfix] Properly Await Termination in SavepointITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf35cf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf35cf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf35cf2
Branch: refs/heads/master
Commit: ccf35cf203f349dd126e641eff746370ec04403f
Parents: 1440682
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 9 11:15:23 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 11:17:05 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/test/checkpointing/SavepointITCase.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf35cf2/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 87cf80f..47a0828 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -232,6 +232,7 @@ public class SavepointITCase extends TestLogger {
// Shut down the Flink cluster (thereby canceling the job)
LOG.info("Shutting down Flink cluster.");
flink.shutdown();
+ flink.awaitTermination();
// - Verification START -------------------------------------------
@@ -339,6 +340,7 @@ public class SavepointITCase extends TestLogger {
SubtaskState subtaskState = taskState.getState(tdd.getIndexInSubtaskGroup());
assertNotNull(subtaskState);
+
errMsg = "Initial operator state mismatch.";
assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
tdd.getTaskStateHandles().getLegacyOperatorState());
[2/2] flink git commit: [FLINK-5019] Proper isRestored result for
tasks that did not write state
Posted by al...@apache.org.
[FLINK-5019] Proper isRestored result for tasks that did not write state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14406821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14406821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14406821
Branch: refs/heads/master
Commit: 1440682136e84651f8f0d1230d038880588f6cb3
Parents: 07ab9f4
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 3 11:34:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 11:17:05 2016 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/PendingCheckpoint.java | 4 +-
.../checkpoint/StateAssignmentOperation.java | 253 ++++++++++---------
.../flink/runtime/checkpoint/SubtaskState.java | 7 -
.../flink/runtime/state/TaskStateHandles.java | 11 +-
.../streaming/runtime/tasks/StreamTask.java | 6 +-
5 files changed, 132 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 92dca21..43a2557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -245,12 +245,10 @@ public class PendingCheckpoint {
return false;
}
- if (null != checkpointedSubtaskState && checkpointedSubtaskState.hasState()) {
+ if (null != checkpointedSubtaskState) {
JobVertexID jobVertexID = vertex.getJobvertexId();
-
int subtaskIndex = vertex.getParallelSubtaskIndex();
-
TaskState taskState = taskStates.get(jobVertexID);
if (null == taskState) {
http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index d98c8e8..f496a07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -63,164 +63,169 @@ public class StateAssignmentOperation {
public boolean assignStates() throws Exception {
for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
+
TaskState taskState = taskGroupStateEntry.getValue();
ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey());
- if (executionJobVertex != null) {
- // check that the number of key groups have not changed
- if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
- throw new IllegalStateException("The maximum parallelism (" +
- taskState.getMaxParallelism() + ") with which the latest " +
- "checkpoint of the execution job vertex " + executionJobVertex +
- " has been taken and the current maximum parallelism (" +
- executionJobVertex.getMaxParallelism() + ") changed. This " +
- "is currently not supported.");
+ if (executionJobVertex == null) {
+ if (allowNonRestoredState) {
+ logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
+ continue;
+ } else {
+ throw new IllegalStateException("There is no execution job vertex for the job" +
+ " vertex ID " + taskGroupStateEntry.getKey());
}
+ }
- final int oldParallelism = taskState.getParallelism();
- final int newParallelism = executionJobVertex.getParallelism();
- final boolean parallelismChanged = oldParallelism != newParallelism;
- final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
-
- if (hasNonPartitionedState && parallelismChanged) {
- throw new IllegalStateException("Cannot restore the latest checkpoint because " +
- "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
- "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
- " has parallelism " + newParallelism + " whereas the corresponding" +
- "state object has a parallelism of " + oldParallelism);
- }
+ // check that the number of key groups have not changed
+ if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+ throw new IllegalStateException("The maximum parallelism (" +
+ taskState.getMaxParallelism() + ") with which the latest " +
+ "checkpoint of the execution job vertex " + executionJobVertex +
+ " has been taken and the current maximum parallelism (" +
+ executionJobVertex.getMaxParallelism() + ") changed. This " +
+ "is currently not supported.");
+ }
- List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
- executionJobVertex.getMaxParallelism(),
- newParallelism);
+ //-------------------------------------------------------------------
- final int chainLength = taskState.getChainLength();
+ final int oldParallelism = taskState.getParallelism();
+ final int newParallelism = executionJobVertex.getParallelism();
+ final boolean parallelismChanged = oldParallelism != newParallelism;
+ final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
- // operator chain idx -> list of the stored op states from all parallel instances for this chain idx
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
+ if (hasNonPartitionedState && parallelismChanged) {
+ throw new IllegalStateException("Cannot restore the latest checkpoint because " +
+ "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
+ "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
+ " has parallelism " + newParallelism + " whereas the corresponding" +
+ "state object has a parallelism of " + oldParallelism);
+ }
- List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
- List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
+ List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
+ executionJobVertex.getMaxParallelism(),
+ newParallelism);
+
+ final int chainLength = taskState.getChainLength();
- for (int p = 0; p < oldParallelism; ++p) {
- SubtaskState subtaskState = taskState.getState(p);
+ // operator chain idx -> list of the stored op states from all parallel instances for this chain idx
+ @SuppressWarnings("unchecked")
+ List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
+ @SuppressWarnings("unchecked")
+ List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
- if (null != subtaskState) {
- collectParallelStatesByChainOperator(
- parallelOpStatesBackend, subtaskState.getManagedOperatorState());
+ List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
+ List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
- collectParallelStatesByChainOperator(
- parallelOpStatesStream, subtaskState.getRawOperatorState());
+ for (int p = 0; p < oldParallelism; ++p) {
+ SubtaskState subtaskState = taskState.getState(p);
- KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
- if (null != keyedStateBackend) {
- parallelKeyedStatesBackend.add(keyedStateBackend);
- }
+ if (null != subtaskState) {
+ collectParallelStatesByChainOperator(
+ parallelOpStatesBackend, subtaskState.getManagedOperatorState());
- KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
- if (null != keyedStateStream) {
- parallelKeyedStateStream.add(keyedStateStream);
- }
+ collectParallelStatesByChainOperator(
+ parallelOpStatesStream, subtaskState.getRawOperatorState());
+
+ KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+ if (null != keyedStateBackend) {
+ parallelKeyedStatesBackend.add(keyedStateBackend);
}
- }
- // operator chain index -> lists with collected states (one collection for each parallel subtasks)
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
+ KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+ if (null != keyedStateStream) {
+ parallelKeyedStateStream.add(keyedStateStream);
+ }
+ }
+ }
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
+ // operator chain index -> lists with collected states (one collection for each parallel subtasks)
+ @SuppressWarnings("unchecked")
+ List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
- //TODO here we can employ different redistribution strategies for state, e.g. union state.
- // For now we only offer round robin as the default.
- OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+ @SuppressWarnings("unchecked")
+ List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
- for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
+ //TODO here we can employ different redistribution strategies for state, e.g. union state.
+ // For now we only offer round robin as the default.
+ OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
- List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
- List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
+ for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
- partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
- opStateRepartitioner,
- chainOpParallelStatesBackend,
- oldParallelism,
- newParallelism);
+ List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
+ List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
- partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
- opStateRepartitioner,
- chainOpParallelStatesStream,
- oldParallelism,
- newParallelism);
- }
+ partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
+ opStateRepartitioner,
+ chainOpParallelStatesBackend,
+ oldParallelism,
+ newParallelism);
- for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
- // non-partitioned state
- ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
+ partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
+ opStateRepartitioner,
+ chainOpParallelStatesStream,
+ oldParallelism,
+ newParallelism);
+ }
- if (hasNonPartitionedState) {
- // count the number of executions for which we set a state
- nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
- }
+ for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
+ // non-partitioned state
+ ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
- // partitionable state
- @SuppressWarnings("unchecked")
- Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
- @SuppressWarnings("unchecked")
- Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
- List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
- List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
+ if (!parallelismChanged) {
+ nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+ }
- for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
- List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
- partitionedParallelStatesBackend[chainIdx];
+ // partitionable state
+ @SuppressWarnings("unchecked")
+ Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
+ @SuppressWarnings("unchecked")
+ Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
+ List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
+ List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
- List<Collection<OperatorStateHandle>> redistributedOpStateStream =
- partitionedParallelStatesStream[chainIdx];
+ for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
+ List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
+ partitionedParallelStatesBackend[chainIdx];
- if (redistributedOpStateBackend != null) {
- operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
- }
+ List<Collection<OperatorStateHandle>> redistributedOpStateStream =
+ partitionedParallelStatesStream[chainIdx];
- if (redistributedOpStateStream != null) {
- operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
- }
+ if (redistributedOpStateBackend != null) {
+ operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
}
- Execution currentExecutionAttempt = executionJobVertex
- .getTaskVertices()[subTaskIdx]
- .getCurrentExecutionAttempt();
-
- List<KeyGroupsStateHandle> newKeyedStatesBackend;
- List<KeyGroupsStateHandle> newKeyedStateStream;
- if (parallelismChanged) {
- KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
- newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
- newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
- } else {
- SubtaskState subtaskState = taskState.getState(subTaskIdx);
- KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
- KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
- newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
- newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
+ if (redistributedOpStateStream != null) {
+ operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
}
+ }
- TaskStateHandles taskStateHandles = new TaskStateHandles(
- nonPartitionableState,
- operatorStateFromBackend,
- operatorStateFromStream,
- newKeyedStatesBackend,
- newKeyedStateStream);
-
- currentExecutionAttempt.setInitialState(taskStateHandles);
+ Execution currentExecutionAttempt = executionJobVertex
+ .getTaskVertices()[subTaskIdx]
+ .getCurrentExecutionAttempt();
+
+ List<KeyGroupsStateHandle> newKeyedStatesBackend;
+ List<KeyGroupsStateHandle> newKeyedStateStream;
+ if (parallelismChanged) {
+ KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
+ newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
+ newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+ } else {
+ SubtaskState subtaskState = taskState.getState(subTaskIdx);
+ KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+ KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+ newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
+ newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
}
- } else if (allowNonRestoredState) {
- logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
- } else {
- throw new IllegalStateException("There is no execution job vertex for the job" +
- " vertex ID " + taskGroupStateEntry.getKey());
+
+ TaskStateHandles taskStateHandles = new TaskStateHandles(
+ nonPartitionableState,
+ operatorStateFromBackend,
+ operatorStateFromStream,
+ newKeyedStatesBackend,
+ newKeyedStateStream);
+
+ currentExecutionAttempt.setInitialState(taskStateHandles);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 9b9a810..ca51e1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -212,13 +212,6 @@ public class SubtaskState implements StateObject {
}
- public boolean hasState() {
- return (null != legacyOperatorState && !legacyOperatorState.isEmpty())
- || (null != managedOperatorState && !managedOperatorState.isEmpty())
- || null != managedKeyedState
- || null != rawKeyedState;
- }
-
@Override
public int hashCode() {
int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
index ecd6399..417a9dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state;
import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.util.CollectionUtil;
import java.io.Serializable;
import java.util.ArrayList;
@@ -99,14 +98,6 @@ public class TaskStateHandles implements Serializable {
return managedOperatorState;
}
- public boolean hasState() {
- return !ChainedStateHandle.isNullOrEmpty(legacyOperatorState)
- || !CollectionUtil.isNullOrEmpty(managedKeyedState)
- || !CollectionUtil.isNullOrEmpty(rawKeyedState)
- || !CollectionUtil.isNullOrEmpty(rawOperatorState)
- || !CollectionUtil.isNullOrEmpty(managedOperatorState);
- }
-
private static List<Collection<OperatorStateHandle>> transform(ChainedStateHandle<OperatorStateHandle> in) {
if (null == in) {
return Collections.emptyList();
@@ -169,4 +160,4 @@ public class TaskStateHandles implements Serializable {
result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0);
return result;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/14406821/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bd34044..83d72e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -918,11 +918,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
keyedStateHandleBackend,
keyedStateHandleStream);
- if (subtaskState.hasState()) {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
- } else {
- owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData);
- }
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",