You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/17 17:16:07 UTC
flink git commit: [FLINK-5985] Report no task states for stateless
tasks on checkpointing
Repository: flink
Updated Branches:
refs/heads/master 97ccc1473 -> 20fff32a5
[FLINK-5985] Report no task states for stateless tasks on checkpointing
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20fff32a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20fff32a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20fff32a
Branch: refs/heads/master
Commit: 20fff32a5c6d3385bce54a3b76696fb3063a2ab2
Parents: 97ccc14
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Mar 10 17:55:45 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Mar 17 18:14:56 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 9 +-
.../state/DefaultOperatorStateBackend.java | 2 +-
.../apache/flink/runtime/state/DoneFuture.java | 16 +-
.../state/ManagedInitializationContext.java | 3 +-
.../flink/runtime/state/Snapshotable.java | 5 +-
.../state/heap/HeapKeyedStateBackend.java | 8 +-
.../checkpoint/PendingCheckpointTest.java | 35 ++++-
.../runtime/state/OperatorStateBackendTest.java | 19 ++-
.../runtime/state/StateBackendTestBase.java | 49 +++---
.../flink/runtime/state/StateUtilTest.java | 2 +-
.../api/operators/OperatorSnapshotResult.java | 12 ++
.../streaming/runtime/tasks/StreamTask.java | 32 +++-
.../streaming/runtime/tasks/StreamTaskTest.java | 74 ++++++++-
.../test/checkpointing/SavepointITCase.java | 153 ++++++++++++++++++-
14 files changed, 365 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f585d21..5b72e03 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -240,6 +240,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return keyGroupPrefixBytes;
}
+ private boolean hasRegisteredState() {
+ return !kvStateInformation.isEmpty();
+ }
+
/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
@@ -267,13 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (db != null) {
- if (kvStateInformation.isEmpty()) {
+ if (!hasRegisteredState()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
" . Returning null.");
}
-
- return new DoneFuture<>(null);
+ return DoneFuture.nullValue();
}
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 8dcf49e..2497a00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -161,7 +161,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
CheckpointOptions checkpointOptions) throws Exception {
if (registeredStates.isEmpty()) {
- return new DoneFuture<>(null);
+ return DoneFuture.nullValue();
}
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
index 777ab69..d2d808d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
@@ -30,10 +30,13 @@ import java.util.concurrent.TimeoutException;
* @param <T> The type of object in this {@code Future}.
*/
public class DoneFuture<T> implements RunnableFuture<T> {
- private final T keyGroupsStateHandle;
- public DoneFuture(T keyGroupsStateHandle) {
- this.keyGroupsStateHandle = keyGroupsStateHandle;
+ private static final DoneFuture<?> NULL_FUTURE = new DoneFuture<Object>(null);
+
+ private final T payload;
+
+ public DoneFuture(T payload) {
+ this.payload = payload;
}
@Override
@@ -53,7 +56,7 @@ public class DoneFuture<T> implements RunnableFuture<T> {
@Override
public T get() throws InterruptedException, ExecutionException {
- return keyGroupsStateHandle;
+ return payload;
}
@Override
@@ -67,4 +70,9 @@ public class DoneFuture<T> implements RunnableFuture<T> {
public void run() {
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> DoneFuture<T> nullValue() {
+ return (DoneFuture<T>) NULL_FUTURE;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index 5255c43..522aca6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -36,7 +36,8 @@ import org.apache.flink.api.common.state.OperatorStateStore;
public interface ManagedInitializationContext {
/**
- * Returns true, if some managed state was restored from the snapshot of a previous execution.
+ * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for
+ * stateless tasks.
*/
boolean isRestored();
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index 0d92b46..c7e62f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+
import java.util.Collection;
import java.util.concurrent.RunnableFuture;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
/**
- * Interface for operations that can perform snapshots of their state.
+ * Interface for operators that can perform snapshots of their state.
*
* @param <S> Generic type of the state object that is created as handle to snapshots.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0335933..f3e4ec6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -150,6 +150,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return stateTable;
}
+ private boolean hasRegisteredState() {
+ return !stateTables.isEmpty();
+ }
+
@Override
public <N, V> InternalValueState<N, V> createValueState(
TypeSerializer<N> namespaceSerializer,
@@ -225,8 +229,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
- if (stateTables.isEmpty()) {
- return new DoneFuture<>(null);
+ if (!hasRegisteredState()) {
+ return DoneFuture.nullValue();
}
long syncStartTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 55b5fe0..a15684c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,11 +24,10 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
import org.mockito.Mockito;
import java.io.File;
@@ -49,6 +48,7 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
public class PendingCheckpointTest {
@@ -56,7 +56,10 @@ public class PendingCheckpointTest {
private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
static {
- ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+ when(vertex.getMaxParallelism()).thenReturn(128);
+ when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
+ ACK_TASKS.put(ATTEMPT_ID, vertex);
}
@Rule
@@ -288,6 +291,32 @@ public class PendingCheckpointTest {
}
}
+ /**
+ * FLINK-5985
+ * <p>
+ * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
+ * should not appear in the task states map of the checkpoint.
+ */
+ @Test
+ public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
+ PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
+ Assert.assertTrue(pending.getTaskStates().isEmpty());
+ }
+
+ /**
+ * FLINK-5985
+ * <p>
+ * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
+ * for subtasks that acknowledge some state are given an entry in the task states of the checkpoint.
+ */
+ @Test
+ public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
+ PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+ pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetrics.class));
+ Assert.assertFalse(pending.getTaskStates().isEmpty());
+ }
+
@Test
public void testSetCanceller() {
final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 94df524..d883d6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,11 +23,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
+import java.util.concurrent.RunnableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -144,6 +147,19 @@ public class OperatorStateBackendTest {
}
@Test
+ public void testSnapshotEmpty() throws Exception {
+ DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ CheckpointStreamFactory streamFactory =
+ abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+
+ RunnableFuture<OperatorStateHandle> snapshot =
+ operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+
+ OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
+ Assert.assertNull(stateHandle);
+ }
+
+ @Test
public void testSnapshotRestore() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
@@ -166,7 +182,8 @@ public class OperatorStateBackendTest {
listState3.add(20);
CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
- OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
+ OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(
+ operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 331c6bd..faa9314 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,8 +41,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.types.IntValue;
+import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -199,7 +200,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -210,7 +211,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -411,7 +412,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(13, (int) state2.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(
@@ -484,7 +485,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(42L, (long) state.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -529,7 +530,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -540,7 +541,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -628,7 +629,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -639,7 +640,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -730,7 +731,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -742,7 +743,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add(103);
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -834,7 +835,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -846,7 +847,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -1166,7 +1167,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("ShouldBeInSecondHalf");
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles(
Collections.singletonList(snapshot),
@@ -1233,7 +1234,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1284,7 +1285,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1337,7 +1338,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1388,7 +1389,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.put("2", "Second");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
@@ -1693,7 +1694,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// draw a snapshot
KeyGroupsStateHandle snapshot =
- runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
assertNull(snapshot);
backend.dispose();
@@ -1934,12 +1935,4 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
}
}
-
- private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws Exception {
- if(!snapshotRunnableFuture.isDone()) {
- Thread runner = new Thread(snapshotRunnableFuture);
- runner.start();
- }
- return snapshotRunnableFuture.get();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
index e59d027..d6966d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
@@ -30,7 +30,7 @@ public class StateUtilTest extends TestLogger {
*/
@Test
public void testDiscardRunnableFutureWithNullValue() throws Exception {
- RunnableFuture<StateHandle<?>> stateFuture = new DoneFuture<>(null);
+ RunnableFuture<StateHandle<?>> stateFuture = DoneFuture.nullValue();
StateUtil.discardStateFuture(stateFuture);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 83697ae..b1c94cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -121,4 +121,16 @@ public class OperatorSnapshotResult {
throw exception;
}
}
+
+ public boolean hasKeyedState() {
+ return keyedStateManagedFuture != null || keyedStateRawFuture != null;
+ }
+
+ public boolean hasOperatorState() {
+ return operatorStateManagedFuture != null || operatorStateRawFuture != null;
+ }
+
+ public boolean hasState() {
+ return hasKeyedState() || hasOperatorState();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ccaa312..76b2b98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -57,7 +57,6 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -925,14 +924,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream =
new ChainedStateHandle<>(operatorStatesStream);
- SubtaskState subtaskState = new SubtaskState(
+ SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles(
chainedNonPartitionedOperatorsState,
chainedOperatorStateBackend,
chainedOperatorStateStream,
keyedStateHandleBackend,
keyedStateHandleStream);
- if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+ if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
+ CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+
owner.getEnvironment().acknowledgeCheckpoint(
checkpointMetaData.getCheckpointId(),
checkpointMetrics,
@@ -982,6 +983,31 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
+ private SubtaskState createSubtaskStateFromSnapshotStateHandles(
+ ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState,
+ ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend,
+ ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream,
+ KeyGroupsStateHandle keyedStateHandleBackend,
+ KeyGroupsStateHandle keyedStateHandleStream) {
+
+ boolean hasAnyState = keyedStateHandleBackend != null
+ || keyedStateHandleStream != null
+ || !chainedOperatorStateBackend.isEmpty()
+ || !chainedOperatorStateStream.isEmpty()
+ || !chainedNonPartitionedOperatorsState.isEmpty();
+
+ // we signal a stateless task by reporting null, so that there are no attempts to assign empty state to
+ // stateless tasks on restore. This allows for simple job modifications that only concern stateless without
+ // the need to assign them uids to match their (always empty) states.
+ return hasAnyState ? new SubtaskState(
+ chainedNonPartitionedOperatorsState,
+ chainedOperatorStateBackend,
+ chainedOperatorStateStream,
+ keyedStateHandleBackend,
+ keyedStateHandleStream)
+ : null;
+ }
+
private void cleanup() throws Exception {
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3826051..d7e3d6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.tasks;
import akka.dispatch.Futures;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
@@ -88,10 +87,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
-
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
-
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -111,8 +109,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -640,6 +640,74 @@ public class StreamTaskTest extends TestLogger {
verify(rawOperatorStateHandle).discardState();
}
+ /**
+ * FLINK-5985
+ *
+ * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This
+ * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint.
+ */
+ @Test
+ public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+ Environment mockEnvironment = mock(Environment.class);
+
+ // latch blocks until the async checkpoint thread acknowledges
+ final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
+ final List<SubtaskState> checkpointResult = new ArrayList<>(1);
+
+ // we remember what is acknowledged (expected to be null as our task will snapshot empty states).
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ SubtaskState subtaskState = invocationOnMock.getArgumentAt(2, SubtaskState.class);
+ checkpointResult.add(subtaskState);
+ checkpointCompletedLatch.trigger();
+ return null;
+ }
+ }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), any(CheckpointMetrics.class), any(SubtaskState.class));
+
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ // mock the operators
+ StreamOperator<?> statelessOperator =
+ mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ // mock the returned empty snapshot result (all state handles are null)
+ OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult();
+ when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class)))
+ .thenReturn(statelessOperatorSnapshotResult);
+
+ // set up the task
+ StreamOperator<?>[] streamOperators = {statelessOperator};
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
+
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+ checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
+ streamTask.cancel();
+
+ // ensure that 'null' was acknowledged as subtask state
+ Assert.assertNull(checkpointResult.get(0));
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ed45807..a5c994a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,9 +23,8 @@ import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import java.io.FileNotFoundException;
-import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -88,12 +87,14 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
+import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
@@ -107,6 +108,7 @@ import static org.junit.Assert.fail;
/**
* Integration test for triggering and resuming from savepoints.
*/
+@SuppressWarnings("serial")
public class SavepointITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
@@ -458,6 +460,152 @@ public class SavepointITCase extends TestLogger {
}
}
+ /**
+ * FLINK-5985
+ *
+ * This test ensures we can restore from a savepoint under modifications to the job graph that only concern
+ * stateless operators.
+ */
+ @Test
+ public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
+
+ // Config
+ int numTaskManagers = 2;
+ int numSlotsPerTaskManager = 2;
+ int parallelism = 2;
+
+ // Test deadline
+ final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+
+ final File tmpDir = CommonTestUtils.createTempDirectory();
+ final File savepointDir = new File(tmpDir, "savepoints");
+
+ TestingCluster flink = null;
+ String savepointPath;
+ try {
+ // Flink configuration
+ final Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+ config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+ savepointDir.toURI().toString());
+
+ LOG.info("Flink configuration: " + config + ".");
+
+ // Start Flink
+ flink = new TestingCluster(config);
+ LOG.info("Starting Flink cluster.");
+ flink.start(true);
+
+ // Retrieve the job manager
+ LOG.info("Retrieving JobManager.");
+ ActorGateway jobManager = Await.result(
+ flink.leaderGateway().future(),
+ deadline.timeLeft());
+ LOG.info("JobManager: " + jobManager + ".");
+
+ final StatefulCounter statefulCounter = new StatefulCounter();
+ StatefulCounter.resetForTest(parallelism);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(parallelism);
+ env.addSource(new InfiniteTestSource())
+ .shuffle()
+ .map(new MapFunction<Integer, Integer>() {
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return 4 * value;
+ }
+ })
+ .shuffle()
+ .map(statefulCounter).uid("statefulCounter")
+ .shuffle()
+ .map(new MapFunction<Integer, Integer>() {
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return 2 * value;
+ }
+ })
+ .addSink(new DiscardingSink<Integer>());
+
+ JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
+
+ JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph);
+ JobID jobID = submissionResult.getJobID();
+
+ // wait for the Tasks to be ready
+ StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
+ savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+ Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
+
+ ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
+ LOG.info("Retrieved savepoint: " + savepointPath + ".");
+
+ // Shut down the Flink cluster (thereby canceling the job)
+ LOG.info("Shutting down Flink cluster.");
+ flink.shutdown();
+ flink.awaitTermination();
+
+ } finally {
+ flink.shutdown();
+ flink.awaitTermination();
+ }
+
+ try {
+ LOG.info("Restarting Flink cluster.");
+ flink.start(true);
+
+ // Retrieve the job manager
+ LOG.info("Retrieving JobManager.");
+ ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+ LOG.info("JobManager: " + jobManager + ".");
+
+ // Reset static test helpers
+ StatefulCounter.resetForTest(parallelism);
+
+ // Gather all task deployment descriptors
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(parallelism);
+
+ // generate a modified job graph that adds a stateless op
+ env.addSource(new InfiniteTestSource())
+ .shuffle()
+ .map(new StatefulCounter()).uid("statefulCounter")
+ .shuffle()
+ .map(new MapFunction<Integer, Integer>() {
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .addSink(new DiscardingSink<Integer>());
+
+ JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
+
+ // Set the savepoint path
+ modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+ LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with " +
+ "savepoint path " + savepointPath + " in detached mode.");
+
+ // Submit the job
+ flink.submitJobDetached(modifiedJobGraph);
+ // Await state is restored
+ StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ // Await some progress after restore
+ StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } finally {
+ flink.shutdown();
+ flink.awaitTermination();
+ }
+ }
+
// ------------------------------------------------------------------------
// Test program
// ------------------------------------------------------------------------
@@ -497,6 +645,7 @@ public class SavepointITCase extends TestLogger {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(1);
}
+ Thread.sleep(1);
}
}