You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:29 UTC
[12/17] flink git commit: [FLINK-5823] [checkpoints] Pass state
backend to checkpoint coordinator
[FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d820d6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d820d6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d820d6f
Branch: refs/heads/master
Commit: 7d820d6fe17341463b2a0f9cd1cea1ef085eed21
Parents: 0030d6a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 25 13:23:46 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 7 +++
.../runtime/executiongraph/ExecutionGraph.java | 3 +-
.../executiongraph/ExecutionGraphBuilder.java | 20 ++++-----
...tCoordinatorExternalizedCheckpointsTest.java | 8 ++--
.../CheckpointCoordinatorFailureTest.java | 2 +
.../CheckpointCoordinatorMasterHooksTest.java | 2 +
.../checkpoint/CheckpointCoordinatorTest.java | 31 ++++++++++++++
.../checkpoint/CheckpointStateRestoreTest.java | 4 ++
.../environment/StreamExecutionEnvironment.java | 35 +++++++++------
.../flink/streaming/api/graph/StreamGraph.java | 8 ++--
.../api/scala/StreamExecutionEnvironment.scala | 45 ++++++++++++--------
11 files changed, 118 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index a04e34e..40fa2bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -116,6 +117,10 @@ public class CheckpointCoordinator {
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
+ /** The root checkpoint state backend, which is responsible for initializing the
+ * checkpoint, storing the metadata, and cleaning up the checkpoint */
+ private final StateBackend checkpointStateBackend;
+
/** Default directory for persistent checkpoints; <code>null</code> if none configured.
* THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
@Nullable
@@ -196,6 +201,7 @@ public class CheckpointCoordinator {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
@Nullable String checkpointDirectory,
+ StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory) {
@@ -233,6 +239,7 @@ public class CheckpointCoordinator {
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
+ this.checkpointStateBackend = checkNotNull(checkpointStateBackend);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 202839c..f88a6b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -452,7 +452,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
String checkpointDir,
- StateBackend metadataStore,
+ StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
// simple sanity checks
@@ -482,6 +482,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointIDCounter,
checkpointStore,
checkpointDir,
+ checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 47948a9..c742903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -115,16 +115,16 @@ public class ExecutionGraphBuilder {
final ExecutionGraph executionGraph;
try {
executionGraph = (prior != null) ? prior :
- new ExecutionGraph(
- jobInformation,
- futureExecutor,
- ioExecutor,
- timeout,
- restartStrategy,
- failoverStrategy,
- slotProvider,
- classLoader,
- blobWriter);
+ new ExecutionGraph(
+ jobInformation,
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ failoverStrategy,
+ slotProvider,
+ classLoader,
+ blobWriter);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index edc29fe..f55e0d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.junit.Rule;
import org.junit.Test;
@@ -54,7 +55,7 @@ import static org.junit.Assert.assertTrue;
public class CheckpointCoordinatorExternalizedCheckpointsTest {
@Rule
- public TemporaryFolder tmp = new TemporaryFolder();
+ public final TemporaryFolder tmp = new TemporaryFolder();
/**
* Triggers multiple externalized checkpoints and verifies that the metadata
@@ -69,6 +70,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
ExternalizedCheckpointSettings.externalizeCheckpoints(false);
final File checkpointDir = tmp.newFolder();
+ final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -94,6 +96,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
checkpointDir.getAbsolutePath(),
+ stateBackend,
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -109,8 +112,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
coord.triggerCheckpoint(timestamp1, false);
- long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next()
- .getKey();
+ long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 7c95a34..add7447 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -78,6 +79,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new FailingCompletedCheckpointStore(),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 2f860e0..e53bf3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -402,6 +403,7 @@ public class CheckpointCoordinatorMasterHooksTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2572bc1..609e91c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -141,6 +142,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -202,6 +204,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -254,6 +257,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -307,6 +311,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -410,6 +415,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -530,6 +536,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -698,6 +705,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -829,6 +837,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -994,6 +1003,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1072,6 +1082,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1136,6 +1147,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1269,6 +1281,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1359,6 +1372,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
"dummy-path",
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1433,6 +1447,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1585,6 +1600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
counter,
new StandaloneCompletedCheckpointStore(10),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1686,6 +1702,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1760,6 +1777,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1837,6 +1855,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1890,6 +1909,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -1944,6 +1964,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2007,6 +2028,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2122,6 +2144,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2269,6 +2292,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2553,6 +2577,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
standaloneCompletedCheckpointStore,
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -2701,6 +2726,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
"fake-directory",
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3177,6 +3203,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3356,6 +3383,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3395,6 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3452,6 +3481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
completedCheckpointStore,
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -3545,6 +3575,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
deleteExecutor -> {
SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 47daa01..df2d37a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.SerializableObject;
import org.hamcrest.BaseMatcher;
@@ -107,6 +108,7 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -184,6 +186,7 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
@@ -242,6 +245,7 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8f2531b..56a7e29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -51,6 +50,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -135,7 +135,7 @@ public abstract class StreamExecutionEnvironment {
protected boolean isChainingEnabled = true;
/** The state backend used for storing k/v state and state snapshots. */
- private AbstractStateBackend defaultStateBackend;
+ private StateBackend defaultStateBackend;
/** The time characteristic used by the data streams. */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
@@ -431,12 +431,14 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the state backend that describes how to store and checkpoint operator state. It defines in
- * what form the key/value state ({@link ValueState}, accessible
- * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
- * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
- * the key/value state, and for checkpointed functions (implementing the interface
- * {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}).
+ * Sets the state backend that describes how to store and checkpoint operator state. It defines
+ * both which data structures hold state during execution (for example hash tables, RockDB,
+ * or other data stores) as well as where checkpointed data will be persisted.
+ *
+ * <p>State managed by the state backend includes both keyed state that is accessible on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedStream keyed streams}, as well as
+ * state maintained directly by the user code that implements
+ * {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction CheckpointedFunction}.
*
* <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
* maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
@@ -453,19 +455,28 @@ public abstract class StreamExecutionEnvironment {
* @see #getStateBackend()
*/
@PublicEvolving
+ public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
+ this.defaultStateBackend = Preconditions.checkNotNull(backend);
+ return this;
+ }
+
+ /**
+ * @deprecated Use {@link #setStateBackend(StateBackend)} instead.
+ */
+ @Deprecated
+ @PublicEvolving
public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
this.defaultStateBackend = Preconditions.checkNotNull(backend);
return this;
}
/**
- * Returns the state backend that defines how to store and checkpoint state.
- * @return The state backend that defines how to store and checkpoint state.
+ * Gets the state backend that defines how to store and checkpoint state.
*
- * @see #setStateBackend(AbstractStateBackend)
+ * @see #setStateBackend(StateBackend)
*/
@PublicEvolving
- public AbstractStateBackend getStateBackend() {
+ public StateBackend getStateBackend() {
return defaultStateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 20a361e..e5ed0c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -92,7 +92,7 @@ public class StreamGraph extends StreamingPlan {
protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
- private AbstractStateBackend stateBackend;
+ private StateBackend stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
public StreamGraph(StreamExecutionEnvironment environment) {
@@ -143,11 +143,11 @@ public class StreamGraph extends StreamingPlan {
this.chaining = chaining;
}
- public void setStateBackend(AbstractStateBackend backend) {
+ public void setStateBackend(StateBackend backend) {
this.stateBackend = backend;
}
- public AbstractStateBackend getStateBackend() {
+ public StateBackend getStateBackend() {
return this.stateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 3bba505..cd96dbf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.AbstractStateBackend
+import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -224,35 +225,45 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getCheckpointingMode = javaEnv.getCheckpointingMode()
/**
- * Sets the state backend that describes how to store and checkpoint operator state.
- * It defines in what form the key/value state, accessible from operations on
- * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
- * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
- * functions (implementing the interface
- * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]].
+ * Sets the state backend that describes how to store and checkpoint operator state. It defines
+ * both which data structures hold state during execution (for example hash tables, RockDB,
+ * or other data stores) as well as where checkpointed data will be persisted.
*
- * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
- * maintains the state in heap memory, as objects. It is lightweight without extra
- * dependencies, but can checkpoint only small states (some counters).
+ * State managed by the state backend includes both keyed state that is accessible on
+ * [[org.apache.flink.streaming.api.datastream.KeyedStream keyed streams]], as well as
+ * state maintained directly by the user code that implements
+ * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction CheckpointedFunction]].
*
- * <p>In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
- * stores checkpoints of the state (also maintained as heap objects) in files. When using
- * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
- * that state is not lost upon failures of individual nodes and that the entire streaming
- * program can be executed highly available and strongly consistent (assuming that Flink
- * is run in high-availability mode).
+ * The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]], for example,
+ * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
+ * but can checkpoint only small states (some counters).
+ *
+ * In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
+ * stores checkpoints of the state (also maintained as heap objects) in files.
+ * When using a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
+ * that state is not lost upon failures of individual nodes and that streaming program can be
+ * executed highly available and strongly consistent.
*/
@PublicEvolving
- def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = {
+ def setStateBackend(backend: StateBackend): StreamExecutionEnvironment = {
javaEnv.setStateBackend(backend)
this
}
/**
+ * @deprecated Use [[StreamExecutionEnvironment.setStateBackend(StateBackend)]] instead.
+ */
+ @Deprecated
+ @PublicEvolving
+ def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = {
+ setStateBackend(backend.asInstanceOf[StateBackend])
+ }
+
+ /**
* Returns the state backend that defines how to store and checkpoint state.
*/
@PublicEvolving
- def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend()
+ def getStateBackend: StateBackend = javaEnv.getStateBackend()
/**
* Sets the restart strategy configuration. The configuration specifies which restart strategy