You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:25 UTC

[08/17] flink git commit: [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration

[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa03e78d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa03e78d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa03e78d

Branch: refs/heads/master
Commit: fa03e78d3a245b40ceb3efffeb3020853e74e48b
Parents: 7d820d6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 25 19:04:10 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../state/RocksDBStateBackendFactory.java       |  25 +-
 .../org/apache/flink/util/TernaryBoolean.java   |  91 +++++
 .../apache/flink/util/TernaryBooleanTest.java   |  79 +++++
 .../runtime/state/AbstractStateBackend.java     | 165 +--------
 .../runtime/state/ConfigurableStateBackend.java |  45 +++
 .../flink/runtime/state/StateBackendLoader.java | 265 +++++++++++++++
 .../filesystem/AbstractFileStateBackend.java    | 206 +++++++++++
 .../state/filesystem/FsStateBackend.java        | 327 ++++++++++++------
 .../state/filesystem/FsStateBackendFactory.java |  35 +-
 .../state/memory/MemoryStateBackend.java        | 253 ++++++++++++--
 .../state/memory/MemoryStateBackendFactory.java |  35 ++
 ...ExecutionGraphCheckpointCoordinatorTest.java |   3 +-
 .../ArchivedExecutionGraphTest.java             |  11 +-
 .../runtime/jobmanager/JobManagerTest.java      |  15 +-
 .../runtime/state/MemoryStateBackendTest.java   |   3 +-
 .../runtime/state/StateBackendLoadingTest.java  | 340 +++++++++++++++++--
 .../flink/streaming/api/graph/StreamConfig.java |   6 +-
 .../streaming/runtime/tasks/StreamTask.java     |  20 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |  24 +-
 .../PojoSerializerUpgradeTest.java              |  17 +-
 20 files changed, 1569 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index f0569b8..de5be9a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
@@ -37,33 +38,29 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBSt
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
 
-	private static final long serialVersionUID = 4906988360901930371L;
-
-	/** The key under which the config stores the directory where checkpoints should be stored. */
-	public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-	/** The key under which the config stores the directory where RocksDB should be stored. */
-	public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
-
 	@Override
 	public RocksDBStateBackend createFromConfig(Configuration config)
 			throws IllegalConfigurationException, IOException {
 
-		final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-		final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+		final String checkpointDirURI = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+		final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+		final boolean incrementalCheckpoints = config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
 
 		if (checkpointDirURI == null) {
 			throw new IllegalConfigurationException(
 				"Cannot create the RocksDB state backend: The configuration does not specify the " +
-				"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+				"checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
 		}
 
 		try {
-			Path path = new Path(checkpointDirURI);
-			RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri());
-			if (rocksdbLocalPath != null) {
-				String[] directories = rocksdbLocalPath.split(",|" + File.pathSeparator);
+			final Path path = new Path(checkpointDirURI);
+			final RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri(), incrementalCheckpoints);
+
+			if (rocksdbLocalPaths != null) {
+				String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
 				backend.setDbStoragePaths(directories);
 			}
+
 			LOG.info("State backend is set to RocksDB (configured DB storage paths {}, checkpoints to filesystem {} ) ",
 					backend.getDbStoragePaths(), path);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
new file mode 100644
index 0000000..14ef24b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+/**
+ * A ternary boolean, which can have the values 'true', 'false', or 'undefined'.
+ *
+ * <p>A ternary boolean can for example be used to configuration switches that
+ * may be not configured (undefined), in which case a default value should be assumed.
+ */
+@PublicEvolving
+public enum TernaryBoolean {
+
+	/** The value for 'true'. */
+	TRUE,
+
+	/** The value for 'false'. */
+	FALSE,
+
+	/** The value for 'undefined'. In a configuration setting, this typically means that the
+	 * default value will be used, or the value from a deployment-wide configuration.*/
+	UNDEFINED;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the boolean value corresponding to this value. If this is the 'undefined' value,
+	 * the method returns the given default.
+	 *
+	 * @param defaultValue The value to be returned in case this ternary value is 'undefined'.
+	 */
+	public boolean getOrDefault(boolean defaultValue) {
+		return this == UNDEFINED ? defaultValue : (this == TRUE);
+	}
+
+	/**
+	 * Gets the boolean value corresponding to this value. If this is the 'UNDEFINED' value,
+	 * the method returns the given valueForUndefined.
+	 *
+	 * @param valueForUndefined The value to be returned in case this ternary value is 'undefined'.
+	 */
+	public TernaryBoolean resolveUndefined(boolean valueForUndefined) {
+		return this != UNDEFINED ? this : fromBoolean(valueForUndefined);
+	}
+
+	/**
+	 * Gets this ternary boolean as a boxed boolean. The value 'undefined' results
+	 * in 'null.
+	 */
+	@Nullable
+	public Boolean getAsBoolean() {
+		return this == UNDEFINED ? null : (this == TRUE ? Boolean.TRUE : Boolean.FALSE);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Converts the given boolean to a TernaryBoolean, {@link #TRUE} or {@link #FALSE} respectively.
+	 */
+	public static TernaryBoolean fromBoolean(boolean bool) {
+		return bool ? TRUE : FALSE;
+	}
+
+	/**
+	 * Converts the given boxed Boolean to a TernaryBoolean. A null value results in
+	 * {@link #UNDEFINED}, while a non-null value results in {@link #TRUE} or {@link #FALSE} respectively.
+	 */
+	public static TernaryBoolean fromBoxedBoolean(@Nullable Boolean bool) {
+		return bool == null ? UNDEFINED : fromBoolean(bool);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java b/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
new file mode 100644
index 0000000..866fafb
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.TernaryBoolean.FALSE;
+import static org.apache.flink.util.TernaryBoolean.TRUE;
+import static org.apache.flink.util.TernaryBoolean.UNDEFINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link TernaryBoolean} class.
+ */
+public class TernaryBooleanTest {
+
+	@Test
+	public void testWithDefault() {
+		assertTrue(TRUE.getOrDefault(true));
+		assertTrue(TRUE.getOrDefault(false));
+
+		assertFalse(FALSE.getOrDefault(true));
+		assertFalse(FALSE.getOrDefault(false));
+
+		assertTrue(UNDEFINED.getOrDefault(true));
+		assertFalse(UNDEFINED.getOrDefault(false));
+	}
+
+	@Test
+	public void testResolveUndefined() {
+		assertEquals(TRUE, TRUE.resolveUndefined(true));
+		assertEquals(TRUE, TRUE.resolveUndefined(false));
+
+		assertEquals(FALSE, FALSE.resolveUndefined(true));
+		assertEquals(FALSE, FALSE.resolveUndefined(false));
+
+		assertEquals(TRUE, UNDEFINED.resolveUndefined(true));
+		assertEquals(FALSE, UNDEFINED.resolveUndefined(false));
+	}
+
+	@Test
+	public void testToBoolean() {
+		assertTrue(Boolean.TRUE == TRUE.getAsBoolean());
+		assertTrue(Boolean.FALSE == FALSE.getAsBoolean());
+		assertNull(UNDEFINED.getAsBoolean());
+	}
+
+	@Test
+	public void testFromBoolean() {
+		assertEquals(TRUE, TernaryBoolean.fromBoolean(true));
+		assertEquals(FALSE, TernaryBoolean.fromBoolean(false));
+	}
+
+	@Test
+	public void testFromBoxedBoolean() {
+		assertEquals(TRUE, TernaryBoolean.fromBoxedBoolean(Boolean.TRUE));
+		assertEquals(FALSE, TernaryBoolean.fromBoxedBoolean(Boolean.FALSE));
+		assertEquals(UNDEFINED, TernaryBoolean.fromBoxedBoolean(null));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 1594e2e..c72f012 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,27 +21,16 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.DynamicCodeLoadingException;
-
-import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An abstract base implementation of the {@link StateBackend} interface.
- * 
- * <p>
+ *
+ * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
@@ -49,19 +38,6 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 	private static final long serialVersionUID = 4620415814639230247L;
 
 	// ------------------------------------------------------------------------
-	//  Configuration shortcut names
-	// ------------------------------------------------------------------------
-
-	/** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
-	public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
-
-	/** The shortcut configuration name for the FileSystem State backend */ 
-	public static final String FS_STATE_BACKEND_NAME = "filesystem";
-
-	/** The shortcut configuration name for the RocksDB State Backend */
-	public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
-
-	// ------------------------------------------------------------------------
 	//  State Backend - Persisting Byte Storage
 	// ------------------------------------------------------------------------
 
@@ -94,141 +70,4 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 	public abstract OperatorStateBackend createOperatorStateBackend(
 			Environment env,
 			String operatorIdentifier) throws Exception;
-
-	// ------------------------------------------------------------------------
-	//  Loading the state backend from a configuration 
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
-	 * in {@link CoreOptions#STATE_BACKEND}.
-	 * 
-	 * <p>The state backends can be specified either via their shortcut name, or via the class name
-	 * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
-	 * is instantiated (via its zero-argument constructor) and its
-	 * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
-	 *
-	 * <p>Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}',
-	 * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and
-	 * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'.
-	 * 
-	 * @param config The configuration to load the state backend from
-	 * @param classLoader The class loader that should be used to load the state backend
-	 * @param logger Optionally, a logger to log actions to (may be null)
-	 * 
-	 * @return The instantiated state backend.
-	 * 
-	 * @throws DynamicCodeLoadingException
-	 *             Thrown if a state backend factory is configured and the factory class was not
-	 *             found or the factory could not be instantiated
-	 * @throws IllegalConfigurationException
-	 *             May be thrown by the StateBackendFactory when creating / configuring the state
-	 *             backend in the factory
-	 * @throws IOException
-	 *             May be thrown by the StateBackendFactory when instantiating the state backend
-	 */
-	public static StateBackend loadStateBackendFromConfig(
-			Configuration config,
-			ClassLoader classLoader,
-			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
-		checkNotNull(config, "config");
-		checkNotNull(classLoader, "classLoader");
-
-		final String backendName = config.getString(CoreOptions.STATE_BACKEND);
-		if (backendName == null) {
-			return null;
-		}
-
-		// by default the factory class is the backend name 
-		String factoryClassName = backendName;
-
-		switch (backendName.toLowerCase()) {
-			case MEMORY_STATE_BACKEND_NAME:
-				if (logger != null) {
-					logger.info("State backend is set to heap memory (checkpoint to JobManager)");
-				}
-				return new MemoryStateBackend();
-
-			case FS_STATE_BACKEND_NAME:
-				FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
-				if (logger != null) {
-					logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
-							fsBackend.getBasePath());
-				}
-				return fsBackend;
-
-			case ROCKSDB_STATE_BACKEND_NAME:
-				factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
-				// fall through to the 'default' case that uses reflection to load the backend
-				// that way we can keep RocksDB in a separate module
-
-			default:
-				if (logger != null) {
-					logger.info("Loading state backend via factory {}", factoryClassName);
-				}
-
-				StateBackendFactory<?> factory;
-				try {
-					@SuppressWarnings("rawtypes")
-					Class<? extends StateBackendFactory> clazz = 
-							Class.forName(factoryClassName, false, classLoader)
-									.asSubclass(StateBackendFactory.class);
-
-					factory = clazz.newInstance();
-				}
-				catch (ClassNotFoundException e) {
-					throw new DynamicCodeLoadingException(
-							"Cannot find configured state backend factory class: " + backendName, e);
-				}
-				catch (ClassCastException | InstantiationException | IllegalAccessException e) {
-					throw new DynamicCodeLoadingException("The class configured under '" +
-							CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
-							backendName + ')', e);
-				}
-				
-				return factory.createFromConfig(config);
-		}
-	}
-
-	/**
-	 * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
-	 * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the
-	 * default state backend (the {@link MemoryStateBackend}). 
-	 *
-	 * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
-	 * how the state backend is loaded from the configuration.
-	 *
-	 * @param config The configuration to load the state backend from
-	 * @param classLoader The class loader that should be used to load the state backend
-	 * @param logger Optionally, a logger to log actions to (may be null)
-	 *
-	 * @return The instantiated state backend.
-	 *
-	 * @throws DynamicCodeLoadingException
-	 *             Thrown if a state backend factory is configured and the factory class was not
-	 *             found or the factory could not be instantiated
-	 * @throws IllegalConfigurationException
-	 *             May be thrown by the StateBackendFactory when creating / configuring the state
-	 *             backend in the factory
-	 * @throws IOException
-	 *             May be thrown by the StateBackendFactory when instantiating the state backend
-	 */
-	public static StateBackend loadStateBackendFromConfigOrCreateDefault(
-			Configuration config,
-			ClassLoader classLoader,
-			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
-		final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
-
-		if (fromConfig != null) {
-			return fromConfig;
-		}
-		else {
-			if (logger != null) {
-				logger.info("No state backend has been configured, using default state backend (Memory / JobManager)");
-			}
-			return new MemoryStateBackend();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
new file mode 100644
index 0000000..f509e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ * An interface for state backends that pick up additional parameters from a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+	/**
+	 * Creates a variant of the state backend that applies additional configuration parameters.
+	 *
+	 * <p>Settings that were directly done on the original state backend object in the application
+	 * program typically have precedence over setting picked up from the configuration.
+	 *
+	 * <p>If no configuration is applied, or if the method directly applies configuration values to
+	 * the (mutable) state backend object, this method may return the original state backend object.
+	 * Otherwise it typically returns a modified copy.
+	 *
+	 * @param config The configuration to pick the values from.
+	 * @return A reconfigured state backend.
+	 *
+	 * @throws IllegalConfigurationException Thrown if the configuration contained invalid entries.
+	 */
+	StateBackend configure(Configuration config) throws IllegalConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
new file mode 100644
index 0000000..857dfc1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from configurations.
+ */
+public class StateBackendLoader {
+
+	// ------------------------------------------------------------------------
+	//  Configuration shortcut names
+	// ------------------------------------------------------------------------
+
+	/** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
+	public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+	/** The shortcut configuration name for the FileSystem State backend */
+	public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+	/** The shortcut configuration name for the RocksDB State Backend */
+	public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+	// ------------------------------------------------------------------------
+	//  Loading the state backend from a configuration 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+	 * in {@link CheckpointingOptions#STATE_BACKEND}.
+	 *
+	 * <p>The state backends can be specified either via their shortcut name, or via the class name
+	 * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
+	 * is instantiated (via its zero-argument constructor) and its
+	 * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
+	 *
+	 * <p>Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+	 * '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+	 * '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+	 *
+	 * @param config The configuration to load the state backend from
+	 * @param classLoader The class loader that should be used to load the state backend
+	 * @param logger Optionally, a logger to log actions to (may be null)
+	 *
+	 * @return The instantiated state backend.
+	 *
+	 * @throws DynamicCodeLoadingException
+	 *             Thrown if a state backend factory is configured and the factory class was not
+	 *             found or the factory could not be instantiated
+	 * @throws IllegalConfigurationException
+	 *             May be thrown by the StateBackendFactory when creating / configuring the state
+	 *             backend in the factory
+	 * @throws IOException
+	 *             May be thrown by the StateBackendFactory when instantiating the state backend
+	 */
+	public static StateBackend loadStateBackendFromConfig(
+			Configuration config,
+			ClassLoader classLoader,
+			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+		checkNotNull(config, "config");
+		checkNotNull(classLoader, "classLoader");
+
+		final String backendName = config.getString(CheckpointingOptions.STATE_BACKEND);
+		if (backendName == null) {
+			return null;
+		}
+
+		// by default the factory class is the backend name 
+		String factoryClassName = backendName;
+
+		switch (backendName.toLowerCase()) {
+			case MEMORY_STATE_BACKEND_NAME:
+				MemoryStateBackend memBackend = new MemoryStateBackendFactory().createFromConfig(config);
+
+				if (logger != null) {
+					Path memExternalized = memBackend.getCheckpointPath();
+					String extern = memExternalized == null ? "" :
+							" (externalized to " + memExternalized + ')';
+					logger.info("State backend is set to heap memory (checkpoint to JobManager) {}", extern);
+				}
+				return memBackend;
+
+			case FS_STATE_BACKEND_NAME:
+				FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
+				if (logger != null) {
+					logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+							fsBackend.getCheckpointPath());
+				}
+				return fsBackend;
+
+			case ROCKSDB_STATE_BACKEND_NAME:
+				factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
+				// fall through to the 'default' case that uses reflection to load the backend
+				// that way we can keep RocksDB in a separate module
+
+			default:
+				if (logger != null) {
+					logger.info("Loading state backend via factory {}", factoryClassName);
+				}
+
+				StateBackendFactory<?> factory;
+				try {
+					@SuppressWarnings("rawtypes")
+					Class<? extends StateBackendFactory> clazz =
+							Class.forName(factoryClassName, false, classLoader)
+									.asSubclass(StateBackendFactory.class);
+
+					factory = clazz.newInstance();
+				}
+				catch (ClassNotFoundException e) {
+					throw new DynamicCodeLoadingException(
+							"Cannot find configured state backend factory class: " + backendName, e);
+				}
+				catch (ClassCastException | InstantiationException | IllegalAccessException e) {
+					throw new DynamicCodeLoadingException("The class configured under '" +
+							CheckpointingOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
+							backendName + ')', e);
+				}
+
+				return factory.createFromConfig(config);
+		}
+	}
+
+	/**
+	 * Checks if an application-defined state backend is given, and if not, loads the state
+	 * backend from the configuration, from the parameter 'state.backend', as defined
+	 * in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
+	 * default state backend (the {@link MemoryStateBackend}). 
+	 *
+	 * <p>If an application-defined state backend is found, and the state backend is a
+	 * {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration)}
+	 * on the state backend.
+	 *
+	 * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
+	 * how the state backend is loaded from the configuration.
+	 *
+	 * @param config The configuration to load the state backend from
+	 * @param classLoader The class loader that should be used to load the state backend
+	 * @param logger Optionally, a logger to log actions to (may be null)
+	 *
+	 * @return The instantiated state backend.
+	 *
+	 * @throws DynamicCodeLoadingException
+	 *             Thrown if a state backend factory is configured and the factory class was not
+	 *             found or the factory could not be instantiated
+	 * @throws IllegalConfigurationException
+	 *             May be thrown by the StateBackendFactory when creating / configuring the state
+	 *             backend in the factory
+	 * @throws IOException
+	 *             May be thrown by the StateBackendFactory when instantiating the state backend
+	 */
+	public static StateBackend fromApplicationOrConfigOrDefault(
+			@Nullable StateBackend fromApplication,
+			Configuration config,
+			ClassLoader classLoader,
+			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+		checkNotNull(config, "config");
+		checkNotNull(classLoader, "classLoader");
+
+		final StateBackend backend;
+
+		// (1) the application defined state backend has precedence
+		if (fromApplication != null) {
+			if (logger != null) {
+				logger.info("Using application-defined state backend: {}", fromApplication);
+			}
+
+			// see if this is supposed to pick up additional configuration parameters
+			if (fromApplication instanceof ConfigurableStateBackend) {
+				// needs to pick up configuration
+				if (logger != null) {
+					logger.info("Configuring application-defined state backend with job/cluster config");
+				}
+
+				backend = ((ConfigurableStateBackend) fromApplication).configure(config);
+			}
+			else {
+				// keep as is!
+				backend = fromApplication;
+			}
+		}
+		else {
+			// (2) check if the config defines a state backend
+			final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
+			if (fromConfig != null) {
+				backend = fromConfig;
+			}
+			else {
+				// (3) use the default
+				backend = new MemoryStateBackendFactory().createFromConfig(config);
+				if (logger != null) {
+					logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
+				}
+			}
+		}
+
+		// to keep supporting the old behavior where default (JobManager) Backend + HA mode = checkpoints in HA store
+		// we add the HA persistence dir as the checkpoint directory if none other is set
+
+		if (backend instanceof MemoryStateBackend) {
+			final MemoryStateBackend memBackend = (MemoryStateBackend) backend;
+
+			if (memBackend.getCheckpointPath() == null && HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
+				final String haStoragePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
+
+				if (haStoragePath != null) {
+					try {
+						Path checkpointDirPath = new Path(haStoragePath, UUID.randomUUID().toString());
+						if (checkpointDirPath.toUri().getScheme() == null) {
+							checkpointDirPath = checkpointDirPath.makeQualified(checkpointDirPath.getFileSystem());
+						}
+						Configuration tempConfig = new Configuration(config);
+						tempConfig.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDirPath.toString());
+						return memBackend.configure(tempConfig);
+					} catch (Exception ignored) {}
+				}
+			}
+		}
+
+		return backend;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private StateBackendLoader() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
new file mode 100644
index 0000000..6ec6f24
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints and externalized checkpoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * The state backend is configured with a base directory and persists the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base directory was set to 
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends AbstractStateBackend {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStateBackend.class);
+
+	// ------------------------------------------------------------------------
+	//  State Backend Properties
+	// ------------------------------------------------------------------------
+
+	/** The path where checkpoints will be stored, or null, if none has been configured. */
+	@Nullable
+	private final Path baseCheckpointPath;
+
+	/** The path where savepoints will be stored, or null, if none has been configured. */
+	@Nullable
+	private final Path baseSavepointPath;
+
+	/**
+	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
+	 *
+	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
+	 * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
+	 */
+	protected AbstractFileStateBackend(
+			@Nullable URI baseCheckpointPath,
+			@Nullable URI baseSavepointPath) {
+
+		this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),
+				baseSavepointPath == null ? null : new Path(baseSavepointPath));
+	}
+
+	/**
+	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
+	 *
+	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
+	 * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
+	 */
+	protected AbstractFileStateBackend(
+			@Nullable Path baseCheckpointPath,
+			@Nullable Path baseSavepointPath) {
+
+		this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
+		this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
+	}
+
+	/**
+	 * Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in
+	 * the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence
+	 * over the value in the configuration. If the configuration does not specify a value, it is possible
+	 * that the checkpoint-/savepoint directories in the backend will be null.
+	 *
+	 * <p>This constructor can be used to create a backend that is based partially on a given backend
+	 * and partially on a configuration.
+	 *
+	 * @param baseCheckpointPath The checkpoint base directory to use (or null).
+	 * @param baseSavepointPath The default savepoint directory to use (or null).
+	 * @param configuration The configuration to read values from 
+	 */
+	protected AbstractFileStateBackend(
+			@Nullable Path baseCheckpointPath,
+			@Nullable Path baseSavepointPath,
+			Configuration configuration) {
+
+		this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),
+				parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the checkpoint base directory. Jobs will create job-specific subdirectories
+	 * for checkpoints within this directory. May be null, if not configured.
+	 *
+	 * @return The checkpoint base directory
+	 */
+	@Nullable
+	public Path getCheckpointPath() {
+		return baseCheckpointPath;
+	}
+
+	/**
+	 * Gets the directory where savepoints are stored by default (when no custom path is given
+	 * to the savepoint trigger command).
+	 *
+	 * @return The default directory for savepoints, or null, if no default directory has been configured.
+	 */
+	@Nullable
+	public Path getSavepointPath() {
+		return baseSavepointPath;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	// 
+	/**
+	 * Checks the validity of the path's scheme and path.
+	 *
+	 * @param path The path to check.
+	 * @return The URI as a Path.
+	 *
+	 * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+	 */
+	private static Path validatePath(Path path) {
+		final URI uri = path.toUri();
+		final String scheme = uri.getScheme();
+		final String pathPart = uri.getPath();
+
+		// some validity checks
+		if (scheme == null) {
+			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+					"Please specify the file system scheme explicitly in the URI.");
+		}
+		if (pathPart == null) {
+			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+					"Please specify a directory path for the checkpoint data.");
+		}
+		if (pathPart.length() == 0 || pathPart.equals("/")) {
+			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+		}
+
+		return path;
+	}
+
+	@Nullable
+	private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {
+		if (path != null) {
+			return path;
+		}
+		else {
+			String configValue = config.getString(option);
+			try {
+				return configValue == null ? null : new Path(configValue);
+			}
+			catch (IllegalArgumentException e) {
+				throw new IllegalConfigurationException("Cannot parse value for " + option.key() +
+						" : " + configValue + " . Not a valid path.");
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 952988f..2fff45a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,54 +18,91 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.util.TernaryBoolean;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence the backend's name).
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>Working state is kept on the TaskManager heap. If a TaskManager executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that TaskManager's memory.
+ *
+ * <p>This state backend stores small state chunks directly with the metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly.
+ *
+ * <h1>Persistence Guarantees</h1>
  *
- * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
- * files for each state, for example:
+ * <p>Checkpoints from this state backend are as persistent and available as filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state backend supports
+ * highly available setups. The backend additionally supports savepoints and externalized checkpoints.
  *
- * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all state backends, this backend can either be configured within the application (by creating
+ * the backend with the respective constructor parameters and setting it on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * <p>If the state backend was specified in the application, it may pick up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if configured in the application
+ * without a default savepoint directory, it will pick up a default savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
 
 	private static final long serialVersionUID = -8191916350224044011L;
 
-	/** By default, state smaller than 1024 bytes will not be written to files, but
-	 * will be stored directly with the metadata */
-	public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+	/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
+	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
 
-	/** Maximum size of state that is stored with the metadata, rather than in files */
-	private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
-	
-	/** The path to the directory for the checkpoint data, including the file system
-	 * description via scheme and optional authority */
-	private final Path basePath;
+	// ------------------------------------------------------------------------
 
-	/** State below this size will be stored as part of the metadata, rather than in files */
+	/** State below this size will be stored as part of the metadata, rather than in files.
+	 * A value of '-1' means not yet configured, in which case the default will be used. */
 	private final int fileStateThreshold;
 
-	/** Switch to chose between synchronous and asynchronous snapshots */
-	private final boolean asynchronousSnapshots;
+	/** Switch to chose between synchronous and asynchronous snapshots.
+	 * A value of 'undefined' means not yet configured, in which case the default will be used. */
+	private final TernaryBoolean asynchronousSnapshots;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
@@ -80,9 +117,8 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(String checkpointDataUri) throws IOException {
+	public FsStateBackend(String checkpointDataUri) {
 		this(new Path(checkpointDataUri));
 	}
 
@@ -100,10 +136,8 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
 	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
-	 *
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+	public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) {
 		this(new Path(checkpointDataUri), asynchronousSnapshots);
 	}
 
@@ -120,9 +154,8 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(Path checkpointDataUri) throws IOException {
+	public FsStateBackend(Path checkpointDataUri) {
 		this(checkpointDataUri.toUri());
 	}
 
@@ -140,10 +173,8 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
 	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
-	 *
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+	public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) {
 		this(checkpointDataUri.toUri(), asynchronousSnapshots);
 	}
 
@@ -160,10 +191,30 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(URI checkpointDataUri) throws IOException {
-		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, true);
+	public FsStateBackend(URI checkpointDataUri) {
+		this(checkpointDataUri, null, -1, TernaryBoolean.UNDEFINED);
+	}
+
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI. Optionally, this constructor accepts a default savepoint storage
+	 * directory to which savepoints are stored when no custom target path is give to the savepoint
+	 * command.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+	 *                          and the path to the checkpoint data directory.
+	 * @param defaultSavepointDirectory The default directory to store savepoints to. May be null.
+	 */
+	public FsStateBackend(URI checkpointDataUri, @Nullable URI defaultSavepointDirectory) {
+		this(checkpointDataUri, defaultSavepointDirectory, -1, TernaryBoolean.UNDEFINED);
 	}
 
 	/**
@@ -180,11 +231,10 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
 	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
-	 *
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
-	public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
-		this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
+	public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) {
+		this(checkpointDataUri, null, -1,
+				TernaryBoolean.fromBoolean(asynchronousSnapshots));
 	}
 
 	/**
@@ -202,13 +252,9 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *                          and the path to the checkpoint data directory.
 	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
 	 *                             rather than in files
-	 *
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
-	 * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
 	 */
-	public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
-
-		this(checkpointDataUri, fileStateSizeThreshold, true);
+	public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) {
+		this(checkpointDataUri, null, fileStateSizeThreshold, TernaryBoolean.UNDEFINED);
 	}
 
 	/**
@@ -225,34 +271,120 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
 	 *                          and the path to the checkpoint data directory.
 	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
-	 *                             rather than in files
+	 *                             rather than in files (-1 for default value).
 	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
-	 *
-	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public FsStateBackend(
 			URI checkpointDataUri,
 			int fileStateSizeThreshold,
-			boolean asynchronousSnapshots) throws IOException {
+			boolean asynchronousSnapshots) {
 
-		checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
-		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
-				"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
+		this(checkpointDataUri, null, fileStateSizeThreshold,
+				TernaryBoolean.fromBoolean(asynchronousSnapshots));
+	}
 
-		this.fileStateThreshold = fileStateSizeThreshold;
-		this.basePath = validateAndNormalizeUri(checkpointDataUri);
+	/**
+	 * Creates a new state backend that stores its checkpoint data in the file system and location
+	 * defined by the given URI.
+	 *
+	 * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+	 * must be accessible via {@link FileSystem#get(URI)}.
+	 *
+	 * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+	 * (host and port), or that the Hadoop configuration that describes that information must be in the
+	 * classpath.
+	 *
+	 * @param checkpointDirectory        The path to write checkpoint metadata to.
+	 * @param defaultSavepointDirectory  The path to write savepoints to. If null, the value from
+	 *                                   the runtime configuration will be used, or savepoint
+	 *                                   target locations need to be passed when triggering a savepoint.
+	 * @param fileStateSizeThreshold     State below this size will be stored as part of the metadata,
+	 *                                   rather than in files. If -1, the value configured in the
+	 *                                   runtime configuration will be used, or the default value (1KB)
+	 *                                   if nothing is configured.
+	 * @param asynchronousSnapshots      Flag to switch between synchronous and asynchronous
+	 *                                   snapshot mode. If UNDEFINED, the value configured in the
+	 *                                   runtime configuration will be used.
+	 */
+	public FsStateBackend(
+			URI checkpointDirectory,
+			@Nullable URI defaultSavepointDirectory,
+			int fileStateSizeThreshold,
+			TernaryBoolean asynchronousSnapshots) {
+
+		super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);
 
+		checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");
+		checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+				"The threshold for file state size must be in [-1, %s], where '-1' means to use " +
+						"the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);
+
+		this.fileStateThreshold = fileStateSizeThreshold;
 		this.asynchronousSnapshots = asynchronousSnapshots;
 	}
 
 	/**
-	 * Gets the base directory where all state-containing files are stored.
-	 * The job specific directory is created inside this directory.
+	 * Private constructor that creates a re-configured copy of the state backend.
 	 *
-	 * @return The base directory.
+	 * @param original The state backend to re-configure
+	 * @param configuration The configuration
 	 */
+	private FsStateBackend(FsStateBackend original, Configuration configuration) {
+		super(original.getCheckpointPath(), original.getSavepointPath(), configuration);
+
+		// if asynchronous snapshots were configured, use that setting,
+		// else check the configuration
+		this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
+				configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
+
+		final int sizeThreshold = original.fileStateThreshold >= 0 ?
+				original.fileStateThreshold :
+				configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
+
+		if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
+			this.fileStateThreshold = sizeThreshold;
+		}
+		else {
+			this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+
+			// because this is the only place we (unlikely) ever log, we lazily
+			// create the logger here
+			LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
+					"Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
+					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
+					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the base directory where all the checkpoints are stored.
+	 * The job-specific checkpoint directory is created inside this directory.
+	 *
+	 * @return The base directory for checkpoints.
+	 *
+	 * @deprecated Deprecated in favor of {@link #getCheckpointPath()}.
+	 */
+	@Deprecated
 	public Path getBasePath() {
-		return basePath;
+		return getCheckpointPath();
+	}
+
+	/**
+	 * Gets the base directory where all the checkpoints are stored.
+	 * The job-specific checkpoint directory is created inside this directory.
+	 *
+	 * @return The base directory for checkpoints.
+	 */
+	@Nonnull
+	@Override
+	public Path getCheckpointPath() {
+		// we know that this can never be null by the way of constructor checks
+		//noinspection ConstantConditions
+		return super.getCheckpointPath();
 	}
 
 	/**
@@ -260,12 +392,41 @@ public class FsStateBackend extends AbstractStateBackend {
 	 * This threshold ensures that the backend does not create a large amount of very small files,
 	 * where potentially the file pointers are larger than the state itself.
 	 *
-	 * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
+	 * <p>If not explicitly configured, this is the default value of
+	 * {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.
 	 *
 	 * @return The file size threshold, in bytes.
 	 */
 	public int getMinFileSizeThreshold() {
-		return fileStateThreshold;
+		return fileStateThreshold >= 0 ?
+				fileStateThreshold :
+				CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+	}
+
+	/**
+	 * Gets whether the key/value data structures are asynchronously snapshotted.
+	 *
+	 * <p>If not explicitly configured, this is the default value of
+	 * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
+	 */
+	public boolean isUsingAsynchronousSnapshots() {
+		return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reconfiguration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a copy of this state backend that uses the values defined in the configuration
+	 * for fields where that were not specified in this state backend.
+	 *
+	 * @param config the configuration
+	 * @return The re-configured variant of the state backend
+	 */
+	@Override
+	public FsStateBackend configure(Configuration config) {
+		return new FsStateBackend(this, config);
 	}
 
 	// ------------------------------------------------------------------------
@@ -274,7 +435,7 @@ public class FsStateBackend extends AbstractStateBackend {
 
 	@Override
 	public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
-		return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold);
+		return new FsCheckpointStreamFactory(getCheckpointPath(), jobId, getMinFileSizeThreshold());
 	}
 
 	@Override
@@ -283,7 +444,7 @@ public class FsStateBackend extends AbstractStateBackend {
 			String operatorIdentifier,
 			String targetLocation) throws IOException {
 
-		return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+		return new FsSavepointStreamFactory(new Path(targetLocation), jobId, getMinFileSizeThreshold());
 	}
 
 	@Override
@@ -302,7 +463,7 @@ public class FsStateBackend extends AbstractStateBackend {
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
 				keyGroupRange,
-				asynchronousSnapshots,
+				isUsingAsynchronousSnapshots(),
 				env.getExecutionConfig());
 	}
 
@@ -314,45 +475,19 @@ public class FsStateBackend extends AbstractStateBackend {
 		return new DefaultOperatorStateBackend(
 			env.getUserClassLoader(),
 			env.getExecutionConfig(),
-			asynchronousSnapshots);
+				isUsingAsynchronousSnapshots());
 	}
 
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
 	@Override
 	public String toString() {
-		return "File State Backend @ " + basePath;
-	}
-
-	/**
-	 * Checks and normalizes the checkpoint data URI. This method first checks the validity of the
-	 * URI (scheme, path, availability of a matching file system) and then normalizes the URI
-	 * to a path.
-	 * 
-	 * <p>If the URI does not include an authority, but the file system configured for the URI has an
-	 * authority, then the normalized path will include this authority.
-	 * 
-	 * @param checkpointDataUri The URI to check and normalize.
-	 * @return A normalized URI as a Path.
-	 * 
-	 * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. 
-	 * @throws IOException Thrown, if no file system can be found for the URI's scheme.
-	 */
-	private static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
-		final String scheme = checkpointDataUri.getScheme();
-		final String path = checkpointDataUri.getPath();
-
-		// some validity checks
-		if (scheme == null) {
-			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
-					"Please specify the file system scheme explicitly in the URI.");
-		}
-		if (path == null) {
-			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
-					"Please specify a directory path for the checkpoint data.");
-		}
-		if (path.length() == 0 || path.equals("/")) {
-			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
-		}
-
-		return new Path(checkpointDataUri);
+		return "File State Backend (" +
+				"checkpoints: '" + getCheckpointPath() +
+				"', savepoints: '" + getSavepointPath() +
+				"', asynchronous: " + asynchronousSnapshots +
+				", fileStateThreshold: " + fileStateThreshold + ")";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 4c933ef..2640683 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -18,45 +18,34 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import java.io.IOException;
-
 /**
- * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
  */
+@PublicEvolving
 public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
-	
-	/** The key under which the config stores the directory where checkpoints should be stored */
-	public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-
-	/** The key under which the config stores the threshold for state to be store in memory,
-	 * rather than in files */
-	public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold";
-
 
 	@Override
 	public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException {
-		final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-		final int memoryThreshold = config.getInteger(
-			MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
-
-		if (checkpointDirURI == null) {
+		// we need to explicitly read the checkpoint directory here, because that
+		// is a required constructor parameter
+		final String checkpointDir = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+		if (checkpointDir == null) {
 			throw new IllegalConfigurationException(
 					"Cannot create the file system state backend: The configuration does not specify the " +
-							"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+							"checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
 		}
 
 		try {
-			Path path = new Path(checkpointDirURI);
-			return new FsStateBackend(path.toUri(), memoryThreshold);
+			return new FsStateBackend(checkpointDir).configure(config);
 		}
-		catch (IOException | IllegalArgumentException e) {
+		catch (IllegalArgumentException e) {
 			throw new IllegalConfigurationException("Invalid configuration for the state backend", e);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index b8ebedf..2079a97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,78 +18,265 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.util.TernaryBoolean;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name),
+ * but the checkpoints will be persisted to a file system for high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a
+ * file system dependency in simple setups.
+ *
+ * <p>This state backend should be used only for experimentation, quick local setups,
+ * or for streaming applications that have very small state: Because it requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's memory, thus supporting
+ * large state sizes.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>State checkpointing with this state backend is subject to the following conditions:
+ * <ul>
+ *     <li>Each individual state must not exceed the configured maximum state size
+ *         (see {@link #getMaxStateSize()}.</li>
+ *
+ *     <li>All state from one task (i.e., the sum of all operator states and keyed states from all
+ *         chained operators of the task) must not exceed what the RPC system supports, which is
+ *         be default < 10 MB. That limit can be configured up, but that is typically not advised.</li>
+ *
+ *     <li>The sum of all states in the application times all retained checkpoints must comfortably
+ *         fit into the JobManager's JVM heap space.</li>
+ * </ul>
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>For the use cases where the state sizes can be handled by this backend, the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), and checkpoints
+ * (when high-availability is configured).
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all state backends, this backend can either be configured within the application (by creating
+ * the backend with the respective constructor parameters and setting it on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * <p>If the state backend was specified in the application, it may pick up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if configured in the application
+ * without a default savepoint directory, it will pick up a default savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class MemoryStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
 
 	private static final long serialVersionUID = 4109305377809414635L;
 
-	/** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
-	private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+	/** The default maximal size that the snapshotted memory state may have (5 MiBytes). */
+	public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
 
-	/** The maximal size that the snapshotted memory state may have */
+	/** The maximal size that the snapshotted memory state may have. */
 	private final int maxStateSize;
 
-	/** Switch to chose between synchronous and asynchronous snapshots */
-	private final boolean asynchronousSnapshots;
+	/** Switch to chose between synchronous and asynchronous snapshots.
+	 * A value of 'UNDEFINED' means not yet configured, in which case the default will be used. */
+	private final TernaryBoolean asynchronousSnapshots;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
 	 * up to the default state size (5 MB).
+	 *
+	 * <p>Checkpoint and default savepoint locations are used as specified in the
+	 * runtime configuration.
 	 */
 	public MemoryStateBackend() {
-		this(DEFAULT_MAX_STATE_SIZE);
+		this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
 	}
 
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
-	 * up to the given number of bytes.
+	 * up to the default state size (5 MB). The state backend uses asynchronous snapshots
+	 * or synchronous snapshots as configured.
 	 *
-	 * @param maxStateSize The maximal size of the serialized state
+	 * <p>Checkpoint and default savepoint locations are used as specified in the
+	 * runtime configuration.
+	 *
+	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
 	 */
-	public MemoryStateBackend(int maxStateSize) {
-		this(maxStateSize, true);
+	public MemoryStateBackend(boolean asynchronousSnapshots) {
+		this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.fromBoolean(asynchronousSnapshots));
 	}
 
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
-	 * up to the default state size (5 MB).
+	 * up to the given number of bytes.
 	 *
-	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+	 * <p>Checkpoint and default savepoint locations are used as specified in the
+	 * runtime configuration.
+	 *
+	 * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+	 * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+	 * and the JobManager needs to be able to hold all aggregated state in its memory.
+	 *
+	 * @param maxStateSize The maximal size of the serialized state
 	 */
-	public MemoryStateBackend(boolean asynchronousSnapshots) {
-		this(DEFAULT_MAX_STATE_SIZE, asynchronousSnapshots);
+	public MemoryStateBackend(int maxStateSize) {
+		this(null, null, maxStateSize, TernaryBoolean.UNDEFINED);
 	}
 
 	/**
 	 * Creates a new memory state backend that accepts states whose serialized forms are
-	 * up to the given number of bytes.
+	 * up to the given number of bytes and that uses asynchronous snashots as configured.
+	 *
+	 * <p>Checkpoint and default savepoint locations are used as specified in the
+	 * runtime configuration.
+	 *
+	 * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+	 * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+	 * and the JobManager needs to be able to hold all aggregated state in its memory.
 	 *
 	 * @param maxStateSize The maximal size of the serialized state
 	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
 	 */
 	public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
+		this(null, null, maxStateSize, TernaryBoolean.fromBoolean(asynchronousSnapshots));
+	}
+
+	/**
+	 * Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata
+	 * to, and to persist savepoints to.
+	 *
+	 * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
+	 *                       the runtime configuration will be used.
+	 * @param savepointPath  The path to write savepoints to. If null, the value from
+	 *                       the runtime configuration will be used.
+	 */
+	public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath) {
+		this(checkpointPath, savepointPath, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
+	}
+
+	/**
+	 * Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata
+	 * and savepoints to, as well as configuring state thresholds and asynchronous operations.
+	 *
+	 * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+	 * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+	 * and the JobManager needs to be able to hold all aggregated state in its memory.
+	 *
+	 * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
+	 *                       the runtime configuration will be used.
+	 * @param savepointPath  The path to write savepoints to. If null, the value from
+	 *                       the runtime configuration will be used.
+	 * @param maxStateSize   The maximal size of the serialized state.
+	 * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous
+	 *                              snapshot mode. If null, the value configured in the
+	 *                              runtime configuration will be used.
+	 */
+	public MemoryStateBackend(
+			@Nullable String checkpointPath,
+			@Nullable String savepointPath,
+			int maxStateSize,
+			TernaryBoolean asynchronousSnapshots) {
+
+		super(checkpointPath == null ? null : new Path(checkpointPath),
+				savepointPath == null ? null : new Path(savepointPath));
+
+		checkArgument(maxStateSize > 0, "maxStateSize must be > 0");
 		this.maxStateSize = maxStateSize;
+
 		this.asynchronousSnapshots = asynchronousSnapshots;
 	}
 
+	/**
+	 * Private constructor that creates a re-configured copy of the state backend.
+	 *
+	 * @param original The state backend to re-configure
+	 * @param configuration The configuration
+	 */
+	private MemoryStateBackend(MemoryStateBackend original, Configuration configuration) {
+		super(original.getCheckpointPath(), original.getSavepointPath(), configuration);
+
+		this.maxStateSize = original.maxStateSize;
+
+		// if asynchronous snapshots were configured, use that setting,
+		// else check the configuration
+		this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
+				configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the maximum size that an individual state can have, as configured in the
+	 * constructor (by default {@value #DEFAULT_MAX_STATE_SIZE}).
+	 *
+	 * @return The maximum size that an individual state can have
+	 */
+	public int getMaxStateSize() {
+		return maxStateSize;
+	}
+
+	/**
+	 * Gets whether the key/value data structures are asynchronously snapshotted.
+	 *
+	 * <p>If not explicitly configured, this is the default value of
+	 * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
+	 */
+	public boolean isUsingAsynchronousSnapshots() {
+		return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reconfiguration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a copy of this state backend that uses the values defined in the configuration
+	 * for fields where that were not specified in this state backend.
+	 *
+	 * @param config the configuration
+	 * @return The re-configured variant of the state backend
+	 */
+	@Override
+	public MemoryStateBackend configure(Configuration config) {
+		return new MemoryStateBackend(this, config);
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpoint state persistence
+	// ------------------------------------------------------------------------
+
 	@Override
 	public OperatorStateBackend createOperatorStateBackend(
 		Environment env,
@@ -98,12 +285,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 		return new DefaultOperatorStateBackend(
 			env.getUserClassLoader(),
 			env.getExecutionConfig(),
-			asynchronousSnapshots);
-	}
-
-	@Override
-	public String toString() {
-		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
+			isUsingAsynchronousSnapshots());
 	}
 
 	@Override
@@ -121,6 +303,10 @@ public class MemoryStateBackend extends AbstractStateBackend {
 		return new MemCheckpointStreamFactory(maxStateSize);
 	}
 
+	// ------------------------------------------------------------------------
+	//  checkpoint state persistence
+	// ------------------------------------------------------------------------
+
 	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env, JobID jobID,
@@ -136,7 +322,20 @@ public class MemoryStateBackend extends AbstractStateBackend {
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
 				keyGroupRange,
-				asynchronousSnapshots,
+				isUsingAsynchronousSnapshots(),
 				env.getExecutionConfig());
 	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager) " +
+				"(checkpoints: '" + getCheckpointPath() +
+				"', savepoints: '" + getSavepointPath() +
+				"', asynchronous: " + asynchronousSnapshots +
+				", maxStateSize: " + maxStateSize + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
new file mode 100644
index 0000000..10b6c20
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+/**
+ * A factory that creates an {@link MemoryStateBackend} from a configuration.
+ */
+@PublicEvolving
+public class MemoryStateBackendFactory implements StateBackendFactory<MemoryStateBackend> {
+
+	@Override
+	public MemoryStateBackend createFromConfig(Configuration config) {
+		return new MemoryStateBackend().configure(config);
+	}
+}
\ No newline at end of file