You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:24 UTC

[07/17] flink git commit: [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5893d1d..6a84a11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
@@ -102,7 +103,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				counter,
 				store,
 				null,
-				null,
+				new MemoryStateBackend(),
 				CheckpointStatsTrackerTest.createTestTracker());
 
 		JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 7b9d9aa..3fe8613 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -66,8 +67,6 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class ArchivedExecutionGraphTest extends TestLogger {
-	private static JobVertexID v1ID = new JobVertexID();
-	private static JobVertexID v2ID = new JobVertexID();
 
 	private static ExecutionGraph runtimeGraph;
 
@@ -77,8 +76,8 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		// Setup
 		// -------------------------------------------------------------------------------------------------------------
 
-		v1ID = new JobVertexID();
-		v2ID = new JobVertexID();
+		JobVertexID v1ID = new JobVertexID();
+		JobVertexID v2ID = new JobVertexID();
 
 		JobVertex v1 = new JobVertex("v1", v1ID);
 		JobVertex v2 = new JobVertex("v2", v2ID);
@@ -89,7 +88,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 
-		List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
+		List<JobVertex> vertices = new ArrayList<>(Arrays.asList(v1, v2));
 
 		ExecutionConfig config = new ExecutionConfig();
 
@@ -135,7 +134,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			null,
+			new MemoryStateBackend(),
 			statsTracker);
 
 		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 01d2346..452afd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -149,16 +150,16 @@ import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_AS
 import static org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
 
 	@Rule
-	public TemporaryFolder tmpFolder = new TemporaryFolder();
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	private static ActorSystem system;
 
@@ -830,7 +831,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.toURI().toString());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;
@@ -932,13 +933,13 @@ public class JobManagerTest extends TestLogger {
 			}
 
 			// Verify savepoint path
-			assertNotEquals("Savepoint not triggered", null, savepointPath);
+			assertNotNull("Savepoint not triggered", savepointPath);
 
 			// Wait for job status change
 			Await.ready(cancelled, timeout);
 
-			File savepointFile = new File(savepointPath);
-			assertEquals(true, savepointFile.exists());
+			File savepointFile = new File(new Path(savepointPath).getPath());
+			assertTrue(savepointFile.exists());
 		} finally {
 			if (actorSystem != null) {
 				actorSystem.shutdown();
@@ -1157,7 +1158,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.toURI().toString());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 8ad368b..3d43cd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.TernaryBoolean;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -93,7 +94,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	@Test
 	public void testOversizedState() {
 		try {
-			MemoryStateBackend backend = new MemoryStateBackend(10);
+			MemoryStateBackend backend = new MemoryStateBackend(null, null, 10, TernaryBoolean.TRUE);
 			CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
 
 			HashMap<String, Integer> state = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index a64faf1..cd4e355 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -18,22 +18,27 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
 import org.apache.flink.util.DynamicCodeLoadingException;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.net.URI;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -48,61 +53,199 @@ public class StateBackendLoadingTest {
 
 	private final ClassLoader cl = getClass().getClassLoader();
 
-	private final String backendKey = CoreOptions.STATE_BACKEND.key();
+	private final String backendKey = CheckpointingOptions.STATE_BACKEND.key();
 
 	// ------------------------------------------------------------------------
+	//  defaults
+	// ------------------------------------------------------------------------
 
 	@Test
 	public void testNoStateBackendDefined() throws Exception {
-		assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null));
+		assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null));
 	}
 
 	@Test
 	public void testInstantiateMemoryBackendByDefault() throws Exception {
-		StateBackend backend = AbstractStateBackend
-				.loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+		StateBackend backend =
+				StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null);
 
 		assertTrue(backend instanceof MemoryStateBackend);
 	}
 
 	@Test
-	public void testLoadMemoryStateBackend() throws Exception {
-		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
-		// to guard against config-breaking changes of the name 
+	public void testApplicationDefinedHasPrecedence() throws Exception {
+		final StateBackend appBackend = Mockito.mock(StateBackend.class);
+
 		final Configuration config = new Configuration();
 		config.setString(backendKey, "jobmanager");
 
-		StateBackend backend = AbstractStateBackend
-				.loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+		StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null);
+		assertEquals(appBackend, backend);
+	}
 
-		assertTrue(backend instanceof MemoryStateBackend);
+	// ------------------------------------------------------------------------
+	//  Memory State Backend
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Validates loading a memory state backend from the cluster configuration.
+	 */
+	@Test
+	public void testLoadMemoryStateBackendNoParameters() throws Exception {
+		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+		// to guard against config-breaking changes of the name
+
+		final Configuration config1 = new Configuration();
+		config1.setString(backendKey, "jobmanager");
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
+
+		StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+		StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+		assertTrue(backend1 instanceof MemoryStateBackend);
+		assertTrue(backend2 instanceof MemoryStateBackend);
 	}
 
+	/**
+	 * Validates loading a memory state backend with additional parameters from the cluster configuration.
+	 */
+	@Test
+	public void testLoadMemoryStateWithParameters() throws Exception {
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+		final Path expectedCheckpointPath = new Path(checkpointDir);
+		final Path expectedSavepointPath = new Path(savepointDir);
+
+		final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
+
+		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+		// to guard against config-breaking changes of the name
+
+		final Configuration config1 = new Configuration();
+		config1.setString(backendKey, "jobmanager");
+		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
+		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
+
+		MemoryStateBackend backend1 = (MemoryStateBackend)
+				StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+		MemoryStateBackend backend2 = (MemoryStateBackend)
+				StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+		assertNotNull(backend1);
+		assertNotNull(backend2);
+
+		assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
+		assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
+		assertEquals(expectedSavepointPath, backend1.getSavepointPath());
+		assertEquals(expectedSavepointPath, backend2.getSavepointPath());
+		assertEquals(async, backend1.isUsingAsynchronousSnapshots());
+		assertEquals(async, backend2.isUsingAsynchronousSnapshots());
+	}
+
+	/**
+	 * Validates taking the application-defined memory state backend and adding additional
+	 * parameters from the cluster configuration.
+	 */
+	@Test
+	public void testConfigureMemoryStateBackend() throws Exception {
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+		final Path expectedCheckpointPath = new Path(checkpointDir);
+		final Path expectedSavepointPath = new Path(savepointDir);
+
+		final int maxSize = 100;
+		final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
+
+		final MemoryStateBackend backend = new MemoryStateBackend(maxSize, async);
+
+		final Configuration config = new Configuration();
+		config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, !async);
+
+		StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+		assertTrue(loadedBackend instanceof MemoryStateBackend);
+
+		final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
+		assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
+		assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
+		assertEquals(maxSize, memBackend.getMaxStateSize());
+		assertEquals(async, memBackend.isUsingAsynchronousSnapshots());
+	}
+
+	/**
+	 * Validates taking the application-defined memory state backend and adding additional
+	 * parameters from the cluster configuration, but giving precedence to application-defined
+	 * parameters over configuration-defined parameters.
+	 */
+	@Test
+	public void testConfigureMemoryStateBackendMixed() throws Exception {
+		final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+
+		final Path expectedCheckpointPath = new Path(appCheckpointDir);
+		final Path expectedSavepointPath = new Path(savepointDir);
+
+		final MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);
+
+		final Configuration config = new Configuration();
+		config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this parameter should not be picked up
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+
+		StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+		assertTrue(loadedBackend instanceof MemoryStateBackend);
+
+		final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
+		assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
+		assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
+	}
+
+	// ------------------------------------------------------------------------
+	//  File System State Backend
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Validates loading a file system state backend with additional parameters from the cluster configuration.
+	 */
 	@Test
 	public void testLoadFileSystemStateBackend() throws Exception {
-		final String checkpointDir = new Path(tmp.getRoot().toURI()).toString();
-		final Path expectedPath = new Path(checkpointDir);
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+		final Path expectedCheckpointsPath = new Path(checkpointDir);
+		final Path expectedSavepointsPath = new Path(savepointDir);
 		final int threshold = 1000000;
+		final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
 
 		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
 		// to guard against config-breaking changes of the name 
 		final Configuration config1 = new Configuration();
 		config1.setString(backendKey, "filesystem");
-		config1.setString("state.checkpoints.dir", checkpointDir);
-		config1.setString("state.backend.fs.checkpointdir", checkpointDir);
-		config1.setInteger("state.backend.fs.memory-threshold", threshold);
+		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+		config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
 		final Configuration config2 = new Configuration();
 		config2.setString(backendKey, FsStateBackendFactory.class.getName());
-		config2.setString("state.checkpoints.dir", checkpointDir);
-		config2.setString("state.backend.fs.checkpointdir", checkpointDir);
-		config2.setInteger("state.backend.fs.memory-threshold", threshold);
-
-		StateBackend backend1 = AbstractStateBackend
-				.loadStateBackendFromConfigOrCreateDefault(config1, cl, null);
+		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+		config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
-		StateBackend backend2 = AbstractStateBackend
-				.loadStateBackendFromConfigOrCreateDefault(config2, cl, null);
+		StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+		StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
 
 		assertTrue(backend1 instanceof FsStateBackend);
 		assertTrue(backend2 instanceof FsStateBackend);
@@ -110,13 +253,55 @@ public class StateBackendLoadingTest {
 		FsStateBackend fs1 = (FsStateBackend) backend1;
 		FsStateBackend fs2 = (FsStateBackend) backend2;
 
-		assertEquals(expectedPath, fs1.getBasePath());
-		assertEquals(expectedPath, fs2.getBasePath());
+		assertEquals(expectedCheckpointsPath, fs1.getCheckpointPath());
+		assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
+		assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
+		assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
 		assertEquals(threshold, fs1.getMinFileSizeThreshold());
 		assertEquals(threshold, fs2.getMinFileSizeThreshold());
+		assertEquals(async, fs1.isUsingAsynchronousSnapshots());
+		assertEquals(async, fs2.isUsingAsynchronousSnapshots());
 	}
 
 	/**
+	 * Validates taking the application-defined file system state backend and adding with additional
+	 * parameters from the cluster configuration, but giving precedence to application-defined
+	 * parameters over configuration-defined parameters.
+	 */
+	@Test
+	public void testLoadFileSystemStateBackendMixed() throws Exception {
+		final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+
+		final Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir));
+		final Path expectedSavepointsPath = new Path(savepointDir);
+
+		final int threshold = 1000000;
+
+		final FsStateBackend backend = new FsStateBackend(new URI(appCheckpointDir), threshold);
+
+		final Configuration config = new Configuration();
+		config.setString(backendKey, "jobmanager"); // this should not be picked up 
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
+
+		final StateBackend loadedBackend =
+				StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+		assertTrue(loadedBackend instanceof FsStateBackend);
+
+		final FsStateBackend fs = (FsStateBackend) loadedBackend;
+		assertEquals(expectedCheckpointsPath, fs.getCheckpointPath());
+		assertEquals(expectedSavepointsPath, fs.getSavepointPath());
+		assertEquals(threshold, fs.getMinFileSizeThreshold());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Failures
+	// ------------------------------------------------------------------------
+
+	/**
 	 * This test makes sure that failures properly manifest when the state backend could not be loaded.
 	 */
 	@Test
@@ -126,7 +311,7 @@ public class StateBackendLoadingTest {
 		// try a value that is neither recognized as a name, nor corresponds to a class
 		config.setString(backendKey, "does.not.exist");
 		try {
-			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
 			fail("should fail with an exception");
 		} catch (DynamicCodeLoadingException ignored) {
 			// expected
@@ -135,7 +320,7 @@ public class StateBackendLoadingTest {
 		// try a class that is not a factory
 		config.setString(backendKey, java.io.File.class.getName());
 		try {
-			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
 			fail("should fail with an exception");
 		} catch (DynamicCodeLoadingException ignored) {
 			// expected
@@ -144,7 +329,7 @@ public class StateBackendLoadingTest {
 		// a factory that fails
 		config.setString(backendKey, FailingFactory.class.getName());
 		try {
-			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
 			fail("should fail with an exception");
 		} catch (IOException ignored) {
 			// expected
@@ -152,12 +337,103 @@ public class StateBackendLoadingTest {
 	}
 
 	// ------------------------------------------------------------------------
+	//  High-availability default
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This tests that in the case of configured high-availability, the memory state backend
+	 * automatically grabs the HA persistence directory.
+	 */
+	@Test
+	public void testHighAvailabilityDefaultFallback() throws Exception {
+		final String haPersistenceDir = new Path(tmp.newFolder().toURI()).toString();
+		final Path expectedCheckpointPath = new Path(haPersistenceDir);
+
+		final Configuration config1 = new Configuration();
+		config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+		config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, "jobmanager");
+		config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+		config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+		final MemoryStateBackend appBackend = new MemoryStateBackend();
+
+		final StateBackend loaded1 = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, null);
+		final StateBackend loaded2 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
+		final StateBackend loaded3 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
+
+		assertTrue(loaded1 instanceof MemoryStateBackend);
+		assertTrue(loaded2 instanceof MemoryStateBackend);
+		assertTrue(loaded3 instanceof MemoryStateBackend);
+
+		final MemoryStateBackend memBackend1 = (MemoryStateBackend) loaded1;
+		final MemoryStateBackend memBackend2 = (MemoryStateBackend) loaded2;
+		final MemoryStateBackend memBackend3 = (MemoryStateBackend) loaded3;
+
+		assertNotNull(memBackend1.getCheckpointPath());
+		assertNotNull(memBackend2.getCheckpointPath());
+		assertNotNull(memBackend3.getCheckpointPath());
+		assertNull(memBackend1.getSavepointPath());
+		assertNull(memBackend2.getSavepointPath());
+		assertNull(memBackend3.getSavepointPath());
+
+		assertEquals(expectedCheckpointPath, memBackend1.getCheckpointPath().getParent());
+		assertEquals(expectedCheckpointPath, memBackend2.getCheckpointPath().getParent());
+		assertEquals(expectedCheckpointPath, memBackend3.getCheckpointPath().getParent());
+	}
+
+	@Test
+	public void testHighAvailabilityDefaultFallbackLocalPaths() throws Exception {
+		final String haPersistenceDir = new Path(tmp.newFolder().getAbsolutePath()).toString();
+		final Path expectedCheckpointPath = new Path(haPersistenceDir).makeQualified(FileSystem.getLocalFileSystem());
+
+		final Configuration config1 = new Configuration();
+		config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+		config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, "jobmanager");
+		config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+		config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+		final MemoryStateBackend appBackend = new MemoryStateBackend();
+
+		final StateBackend loaded1 = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, null);
+		final StateBackend loaded2 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
+		final StateBackend loaded3 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
+
+		assertTrue(loaded1 instanceof MemoryStateBackend);
+		assertTrue(loaded2 instanceof MemoryStateBackend);
+		assertTrue(loaded3 instanceof MemoryStateBackend);
+
+		final MemoryStateBackend memBackend1 = (MemoryStateBackend) loaded1;
+		final MemoryStateBackend memBackend2 = (MemoryStateBackend) loaded2;
+		final MemoryStateBackend memBackend3 = (MemoryStateBackend) loaded3;
+
+		assertNotNull(memBackend1.getCheckpointPath());
+		assertNotNull(memBackend2.getCheckpointPath());
+		assertNotNull(memBackend3.getCheckpointPath());
+		assertNull(memBackend1.getSavepointPath());
+		assertNull(memBackend2.getSavepointPath());
+		assertNull(memBackend3.getSavepointPath());
+
+		assertEquals(expectedCheckpointPath, memBackend1.getCheckpointPath().getParent());
+		assertEquals(expectedCheckpointPath, memBackend2.getCheckpointPath().getParent());
+		assertEquals(expectedCheckpointPath, memBackend3.getCheckpointPath().getParent());
+	}
+
+	// ------------------------------------------------------------------------
 
 	static final class FailingFactory implements StateBackendFactory<StateBackend> {
-		private static final long serialVersionUID = 1L;
 
 		@Override
-		public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException {
+		public StateBackend createFromConfig(Configuration config) throws IOException {
 			throw new IOException("fail!");
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 63563f3..ccf6baf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -442,7 +442,7 @@ public class StreamConfig implements Serializable {
 	//  State backend
 	// ------------------------------------------------------------------------
 
-	public void setStateBackend(AbstractStateBackend backend) {
+	public void setStateBackend(StateBackend backend) {
 		if (backend != null) {
 			try {
 				InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
@@ -452,7 +452,7 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public AbstractStateBackend getStateBackend(ClassLoader cl) {
+	public StateBackend getStateBackend(ClassLoader cl) {
 		try {
 			return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a1bd355..f89b916 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -36,13 +36,13 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -734,19 +734,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 
 	private StateBackend createStateBackend() throws Exception {
-		final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
+		final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
 
-		if (fromJob != null) {
-			// backend has been configured on the environment
-			LOG.info("Using user-defined state backend: {}.", fromJob);
-			return fromJob;
-		}
-		else {
-			return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
-					getEnvironment().getTaskManagerInfo().getConfiguration(),
-					getUserCodeClassLoader(),
-					LOG);
-		}
+		return StateBackendLoader.fromApplicationOrConfigOrDefault(
+				fromApplication,
+				getEnvironment().getTaskManagerInfo().getConfiguration(),
+				getUserCodeClassLoader(),
+				LOG);
 	}
 
 	public OperatorStateBackend createOperatorStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index bc69e1c..2051771 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -291,6 +291,8 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 
 	private static class SyncFailureInducingStateBackend extends MemoryStateBackend {
 
+		private static final long serialVersionUID = -1915780414440060539L;
+
 		@Override
 		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
 			return new DefaultOperatorStateBackend(
@@ -308,10 +310,18 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				}
 			};
 		}
+
+		@Override
+		public SyncFailureInducingStateBackend configure(Configuration config) {
+			// retain this instance, no re-configuration!
+			return this;
+		}
 	}
 
 	private static class AsyncFailureInducingStateBackend extends MemoryStateBackend {
 
+		private static final long serialVersionUID = -7613628662587098470L;
+
 		@Override
 		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
 			return new DefaultOperatorStateBackend(
@@ -334,6 +344,12 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				}
 			};
 		}
+
+		@Override
+		public AsyncFailureInducingStateBackend configure(Configuration config) {
+			// retain this instance, no re-configuration!
+			return this;
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -356,6 +372,12 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				new ExecutionConfig(),
 				true);
 		}
+
+		@Override
+		public LockingStreamStateBackend configure(Configuration config) {
+			// retain this instance, no re-configuration!
+			return this;
+		}
 	}
 
 	private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
@@ -373,7 +395,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 
 		@Override
 		public StreamStateHandle closeAndGetHandle() throws IOException {
-			return null;
+			throw new UnsupportedOperationException();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 99a9d1f..ffe220e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -32,15 +32,15 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -70,7 +70,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 
-import static org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -85,9 +84,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	@Parameterized.Parameters(name = "StateBackend: {0}")
 	public static Collection<String> parameters () {
 		return Arrays.asList(
-			AbstractStateBackend.MEMORY_STATE_BACKEND_NAME,
-			AbstractStateBackend.FS_STATE_BACKEND_NAME,
-			AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME);
+				StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+				StateBackendLoader.FS_STATE_BACKEND_NAME,
+				StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME);
 	}
 
 	@ClassRule
@@ -97,9 +96,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 
 	public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.STATE_BACKEND, backendType);
-		config.setString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, temporaryFolder.newFolder().toURI().toString());
-		stateBackend = AbstractStateBackend.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
+		config.setString(CheckpointingOptions.STATE_BACKEND, backendType);
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toURI().toString());
+		stateBackend = StateBackendLoader.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
 	}
 
 	private static final String POJO_NAME = "Pojo";