You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:29 UTC

[12/17] flink git commit: [FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator

[FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d820d6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d820d6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d820d6f

Branch: refs/heads/master
Commit: 7d820d6fe17341463b2a0f9cd1cea1ef085eed21
Parents: 0030d6a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 25 13:23:46 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  7 +++
 .../runtime/executiongraph/ExecutionGraph.java  |  3 +-
 .../executiongraph/ExecutionGraphBuilder.java   | 20 ++++-----
 ...tCoordinatorExternalizedCheckpointsTest.java |  8 ++--
 .../CheckpointCoordinatorFailureTest.java       |  2 +
 .../CheckpointCoordinatorMasterHooksTest.java   |  2 +
 .../checkpoint/CheckpointCoordinatorTest.java   | 31 ++++++++++++++
 .../checkpoint/CheckpointStateRestoreTest.java  |  4 ++
 .../environment/StreamExecutionEnvironment.java | 35 +++++++++------
 .../flink/streaming/api/graph/StreamGraph.java  |  8 ++--
 .../api/scala/StreamExecutionEnvironment.scala  | 45 ++++++++++++--------
 11 files changed, 118 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index a04e34e..40fa2bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -116,6 +117,10 @@ public class CheckpointCoordinator {
 	 * accessing this don't block the job manager actor and run asynchronously. */
 	private final CompletedCheckpointStore completedCheckpointStore;
 
+	/** The root checkpoint state backend, which is responsible for initializing the
+	 * checkpoint, storing the metadata, and cleaning up the checkpoint */
+	private final StateBackend checkpointStateBackend;
+
 	/** Default directory for persistent checkpoints; <code>null</code> if none configured.
 	 * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
 	@Nullable
@@ -196,6 +201,7 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			@Nullable String checkpointDirectory,
+			StateBackend checkpointStateBackend,
 			Executor executor,
 			SharedStateRegistryFactory sharedStateRegistryFactory) {
 
@@ -233,6 +239,7 @@ public class CheckpointCoordinator {
 		this.pendingCheckpoints = new LinkedHashMap<>();
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
+		this.checkpointStateBackend = checkNotNull(checkpointStateBackend);
 		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
 		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 202839c..f88a6b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -452,7 +452,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			String checkpointDir,
-			StateBackend metadataStore,
+			StateBackend checkpointStateBackend,
 			CheckpointStatsTracker statsTracker) {
 
 		// simple sanity checks
@@ -482,6 +482,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointDir,
+			checkpointStateBackend,
 			ioExecutor,
 			SharedStateRegistry.DEFAULT_FACTORY);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 47948a9..c742903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -115,16 +115,16 @@ public class ExecutionGraphBuilder {
 		final ExecutionGraph executionGraph;
 		try {
 			executionGraph = (prior != null) ? prior :
-                new ExecutionGraph(
-                    jobInformation,
-                    futureExecutor,
-                    ioExecutor,
-                    timeout,
-                    restartStrategy,
-                    failoverStrategy,
-                    slotProvider,
-                    classLoader,
-                    blobWriter);
+				new ExecutionGraph(
+					jobInformation,
+					futureExecutor,
+					ioExecutor,
+					timeout,
+					restartStrategy,
+					failoverStrategy,
+					slotProvider,
+					classLoader,
+					blobWriter);
 		} catch (IOException e) {
 			throw new JobException("Could not create the ExecutionGraph.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index edc29fe..f55e0d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -54,7 +55,7 @@ import static org.junit.Assert.assertTrue;
 public class CheckpointCoordinatorExternalizedCheckpointsTest {
 
 	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
+	public final TemporaryFolder tmp = new TemporaryFolder();
 
 	/**
 	 * Triggers multiple externalized checkpoints and verifies that the metadata
@@ -69,6 +70,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 			ExternalizedCheckpointSettings.externalizeCheckpoints(false);
 
 		final File checkpointDir = tmp.newFolder();
+		final FsStateBackend stateBackend = new FsStateBackend(checkpointDir.toURI());
 
 		// create some mock Execution vertices that receive the checkpoint trigger messages
 		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
@@ -94,6 +96,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			checkpointDir.getAbsolutePath(),
+			stateBackend,
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -109,8 +112,7 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 
 			coord.triggerCheckpoint(timestamp1, false);
 
-			long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next()
-				.getKey();
+			long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 7c95a34..add7447 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -78,6 +79,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 2f860e0..e53bf3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -402,6 +403,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2572bc1..609e91c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -141,6 +142,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -202,6 +204,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -254,6 +257,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -307,6 +311,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -410,6 +415,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -530,6 +536,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -698,6 +705,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -829,6 +837,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -994,6 +1003,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1072,6 +1082,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1136,6 +1147,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1269,6 +1281,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1359,6 +1372,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				"dummy-path",
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1433,6 +1447,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1585,6 +1600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1686,6 +1702,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1760,6 +1777,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1837,6 +1855,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1890,6 +1909,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -1944,6 +1964,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -2007,6 +2028,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -2122,6 +2144,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -2269,6 +2292,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -2553,6 +2577,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			standaloneCompletedCheckpointStore,
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -2701,6 +2726,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				"fake-directory",
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -3177,6 +3203,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -3356,6 +3383,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -3395,6 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -3452,6 +3481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			completedCheckpointStore,
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -3545,6 +3575,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 				deleteExecutor -> {
 					SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 47daa01..df2d37a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.SerializableObject;
 
 import org.hamcrest.BaseMatcher;
@@ -107,6 +108,7 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -184,6 +186,7 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
+				new MemoryStateBackend(),
 				Executors.directExecutor(),
 				SharedStateRegistry.DEFAULT_FACTORY);
 
@@ -242,6 +245,7 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			new MemoryStateBackend(),
 			Executors.directExecutor(),
 			SharedStateRegistry.DEFAULT_FACTORY);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8f2531b..56a7e29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -51,6 +50,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -135,7 +135,7 @@ public abstract class StreamExecutionEnvironment {
 	protected boolean isChainingEnabled = true;
 
 	/** The state backend used for storing k/v state and state snapshots. */
-	private AbstractStateBackend defaultStateBackend;
+	private StateBackend defaultStateBackend;
 
 	/** The time characteristic used by the data streams. */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
@@ -431,12 +431,14 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the state backend that describes how to store and checkpoint operator state. It defines in
-	 * what form the key/value state ({@link ValueState}, accessible
-	 * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
-	 * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
-	 * the key/value state, and for checkpointed functions (implementing the interface
-	 * {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}).
+	 * Sets the state backend that describes how to store and checkpoint operator state. It defines
+	 * both which data structures hold state during execution (for example hash tables, RockDB,
+	 * or other data stores) as well as where checkpointed data will be persisted.
+	 *
+	 * <p>State managed by the state backend includes both keyed state that is accessible on
+	 * {@link org.apache.flink.streaming.api.datastream.KeyedStream keyed streams}, as well as
+	 * state maintained directly by the user code that implements
+	 * {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction CheckpointedFunction}.
 	 *
 	 * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
 	 * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
@@ -453,19 +455,28 @@ public abstract class StreamExecutionEnvironment {
 	 * @see #getStateBackend()
 	 */
 	@PublicEvolving
+	public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
+		this.defaultStateBackend = Preconditions.checkNotNull(backend);
+		return this;
+	}
+
+	/**
+	 * @deprecated Use {@link #setStateBackend(StateBackend)} instead.
+	 */
+	@Deprecated
+	@PublicEvolving
 	public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
 		this.defaultStateBackend = Preconditions.checkNotNull(backend);
 		return this;
 	}
 
 	/**
-	 * Returns the state backend that defines how to store and checkpoint state.
-	 * @return The state backend that defines how to store and checkpoint state.
+	 * Gets the state backend that defines how to store and checkpoint state.
 	 *
-	 * @see #setStateBackend(AbstractStateBackend)
+	 * @see #setStateBackend(StateBackend)
 	 */
 	@PublicEvolving
-	public AbstractStateBackend getStateBackend() {
+	public StateBackend getStateBackend() {
 		return defaultStateBackend;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 20a361e..e5ed0c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -92,7 +92,7 @@ public class StreamGraph extends StreamingPlan {
 
 	protected Map<Integer, String> vertexIDtoBrokerID;
 	protected Map<Integer, Long> vertexIDtoLoopTimeout;
-	private AbstractStateBackend stateBackend;
+	private StateBackend stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
@@ -143,11 +143,11 @@ public class StreamGraph extends StreamingPlan {
 		this.chaining = chaining;
 	}
 
-	public void setStateBackend(AbstractStateBackend backend) {
+	public void setStateBackend(StateBackend backend) {
 		this.stateBackend = backend;
 	}
 
-	public AbstractStateBackend getStateBackend() {
+	public StateBackend getStateBackend() {
 		return this.stateBackend;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d820d6f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 3bba505..cd96dbf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.state.AbstractStateBackend
+import org.apache.flink.runtime.state.StateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source._
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -224,35 +225,45 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getCheckpointingMode = javaEnv.getCheckpointingMode()
 
   /**
-   * Sets the state backend that describes how to store and checkpoint operator state.
-   * It defines in what form the key/value state, accessible from operations on
-   * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
-   * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
-   * functions (implementing the interface 
-   * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]].
+   * Sets the state backend that describes how to store and checkpoint operator state. It defines
+   * both which data structures hold state during execution (for example hash tables, RockDB,
+   * or other data stores) as well as where checkpointed data will be persisted.
    *
-   * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
-   * maintains the state in heap memory, as objects. It is lightweight without extra 
-   * dependencies, but can checkpoint only small states (some counters).
+   * State managed by the state backend includes both keyed state that is accessible on
+   * [[org.apache.flink.streaming.api.datastream.KeyedStream keyed streams]], as well as
+   * state maintained directly by the user code that implements
+   * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction CheckpointedFunction]].
    *
-   * <p>In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
-   * stores checkpoints of the state (also maintained as heap objects) in files. When using
-   * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
-   * that state is not lost upon failures of individual nodes and that the entire streaming
-   * program can be executed highly available and strongly consistent (assuming that Flink
-   * is run in high-availability mode).
+   * The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]], for example,
+   * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
+   * but can checkpoint only small states (some counters).
+   *
+   * In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
+   * stores checkpoints of the state (also maintained as heap objects) in files.
+   * When using a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
+   * that state is not lost upon failures of individual nodes and that streaming program can be
+   * executed highly available and strongly consistent.
    */
   @PublicEvolving
-  def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = {
+  def setStateBackend(backend: StateBackend): StreamExecutionEnvironment = {
     javaEnv.setStateBackend(backend)
     this
   }
 
   /**
+   * @deprecated Use [[StreamExecutionEnvironment.setStateBackend(StateBackend)]] instead.
+   */
+  @Deprecated
+  @PublicEvolving
+  def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = {
+    setStateBackend(backend.asInstanceOf[StateBackend])
+  }
+
+  /**
    * Returns the state backend that defines how to store and checkpoint state.
    */
   @PublicEvolving
-  def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend()
+  def getStateBackend: StateBackend = javaEnv.getStateBackend()
 
   /**
     * Sets the restart strategy configuration. The configuration specifies which restart strategy