You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:25 UTC
[08/17] flink git commit: [FLINK-5823] [checkpoints] State backends
define checkpoint and savepoint directories, improved configuration
[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa03e78d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa03e78d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa03e78d
Branch: refs/heads/master
Commit: fa03e78d3a245b40ceb3efffeb3020853e74e48b
Parents: 7d820d6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 25 19:04:10 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100
----------------------------------------------------------------------
.../state/RocksDBStateBackendFactory.java | 25 +-
.../org/apache/flink/util/TernaryBoolean.java | 91 +++++
.../apache/flink/util/TernaryBooleanTest.java | 79 +++++
.../runtime/state/AbstractStateBackend.java | 165 +--------
.../runtime/state/ConfigurableStateBackend.java | 45 +++
.../flink/runtime/state/StateBackendLoader.java | 265 +++++++++++++++
.../filesystem/AbstractFileStateBackend.java | 206 +++++++++++
.../state/filesystem/FsStateBackend.java | 327 ++++++++++++------
.../state/filesystem/FsStateBackendFactory.java | 35 +-
.../state/memory/MemoryStateBackend.java | 253 ++++++++++++--
.../state/memory/MemoryStateBackendFactory.java | 35 ++
...ExecutionGraphCheckpointCoordinatorTest.java | 3 +-
.../ArchivedExecutionGraphTest.java | 11 +-
.../runtime/jobmanager/JobManagerTest.java | 15 +-
.../runtime/state/MemoryStateBackendTest.java | 3 +-
.../runtime/state/StateBackendLoadingTest.java | 340 +++++++++++++++++--
.../flink/streaming/api/graph/StreamConfig.java | 6 +-
.../streaming/runtime/tasks/StreamTask.java | 20 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 24 +-
.../PojoSerializerUpgradeTest.java | 17 +-
20 files changed, 1569 insertions(+), 396 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index f0569b8..de5be9a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
@@ -37,33 +38,29 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBSt
protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
- private static final long serialVersionUID = 4906988360901930371L;
-
- /** The key under which the config stores the directory where checkpoints should be stored. */
- public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
- /** The key under which the config stores the directory where RocksDB should be stored. */
- public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
-
@Override
public RocksDBStateBackend createFromConfig(Configuration config)
throws IllegalConfigurationException, IOException {
- final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
- final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+ final String checkpointDirURI = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+ final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+ final boolean incrementalCheckpoints = config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
if (checkpointDirURI == null) {
throw new IllegalConfigurationException(
"Cannot create the RocksDB state backend: The configuration does not specify the " +
- "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+ "checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
}
try {
- Path path = new Path(checkpointDirURI);
- RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri());
- if (rocksdbLocalPath != null) {
- String[] directories = rocksdbLocalPath.split(",|" + File.pathSeparator);
+ final Path path = new Path(checkpointDirURI);
+ final RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri(), incrementalCheckpoints);
+
+ if (rocksdbLocalPaths != null) {
+ String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
backend.setDbStoragePaths(directories);
}
+
LOG.info("State backend is set to RocksDB (configured DB storage paths {}, checkpoints to filesystem {} ) ",
backend.getDbStoragePaths(), path);
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
new file mode 100644
index 0000000..14ef24b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+/**
+ * A ternary boolean, which can have the values 'true', 'false', or 'undefined'.
+ *
+ * <p>A ternary boolean can for example be used to configuration switches that
+ * may be not configured (undefined), in which case a default value should be assumed.
+ */
+@PublicEvolving
+public enum TernaryBoolean {
+
+ /** The value for 'true'. */
+ TRUE,
+
+ /** The value for 'false'. */
+ FALSE,
+
+ /** The value for 'undefined'. In a configuration setting, this typically means that the
+ * default value will be used, or the value from a deployment-wide configuration.*/
+ UNDEFINED;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the boolean value corresponding to this value. If this is the 'undefined' value,
+ * the method returns the given default.
+ *
+ * @param defaultValue The value to be returned in case this ternary value is 'undefined'.
+ */
+ public boolean getOrDefault(boolean defaultValue) {
+ return this == UNDEFINED ? defaultValue : (this == TRUE);
+ }
+
+ /**
+ * Gets the boolean value corresponding to this value. If this is the 'UNDEFINED' value,
+ * the method returns the given valueForUndefined.
+ *
+ * @param valueForUndefined The value to be returned in case this ternary value is 'undefined'.
+ */
+ public TernaryBoolean resolveUndefined(boolean valueForUndefined) {
+ return this != UNDEFINED ? this : fromBoolean(valueForUndefined);
+ }
+
+ /**
+ * Gets this ternary boolean as a boxed boolean. The value 'undefined' results
+ * in 'null.
+ */
+ @Nullable
+ public Boolean getAsBoolean() {
+ return this == UNDEFINED ? null : (this == TRUE ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Converts the given boolean to a TernaryBoolean, {@link #TRUE} or {@link #FALSE} respectively.
+ */
+ public static TernaryBoolean fromBoolean(boolean bool) {
+ return bool ? TRUE : FALSE;
+ }
+
+ /**
+ * Converts the given boxed Boolean to a TernaryBoolean. A null value results in
+ * {@link #UNDEFINED}, while a non-null value results in {@link #TRUE} or {@link #FALSE} respectively.
+ */
+ public static TernaryBoolean fromBoxedBoolean(@Nullable Boolean bool) {
+ return bool == null ? UNDEFINED : fromBoolean(bool);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java b/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
new file mode 100644
index 0000000..866fafb
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/TernaryBooleanTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.TernaryBoolean.FALSE;
+import static org.apache.flink.util.TernaryBoolean.TRUE;
+import static org.apache.flink.util.TernaryBoolean.UNDEFINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link TernaryBoolean} class.
+ */
+public class TernaryBooleanTest {
+
+ @Test
+ public void testWithDefault() {
+ assertTrue(TRUE.getOrDefault(true));
+ assertTrue(TRUE.getOrDefault(false));
+
+ assertFalse(FALSE.getOrDefault(true));
+ assertFalse(FALSE.getOrDefault(false));
+
+ assertTrue(UNDEFINED.getOrDefault(true));
+ assertFalse(UNDEFINED.getOrDefault(false));
+ }
+
+ @Test
+ public void testResolveUndefined() {
+ assertEquals(TRUE, TRUE.resolveUndefined(true));
+ assertEquals(TRUE, TRUE.resolveUndefined(false));
+
+ assertEquals(FALSE, FALSE.resolveUndefined(true));
+ assertEquals(FALSE, FALSE.resolveUndefined(false));
+
+ assertEquals(TRUE, UNDEFINED.resolveUndefined(true));
+ assertEquals(FALSE, UNDEFINED.resolveUndefined(false));
+ }
+
+ @Test
+ public void testToBoolean() {
+ assertTrue(Boolean.TRUE == TRUE.getAsBoolean());
+ assertTrue(Boolean.FALSE == FALSE.getAsBoolean());
+ assertNull(UNDEFINED.getAsBoolean());
+ }
+
+ @Test
+ public void testFromBoolean() {
+ assertEquals(TRUE, TernaryBoolean.fromBoolean(true));
+ assertEquals(FALSE, TernaryBoolean.fromBoolean(false));
+ }
+
+ @Test
+ public void testFromBoxedBoolean() {
+ assertEquals(TRUE, TernaryBoolean.fromBoxedBoolean(Boolean.TRUE));
+ assertEquals(FALSE, TernaryBoolean.fromBoxedBoolean(Boolean.FALSE));
+ assertEquals(UNDEFINED, TernaryBoolean.fromBoxedBoolean(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 1594e2e..c72f012 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,27 +21,16 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.DynamicCodeLoadingException;
-
-import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An abstract base implementation of the {@link StateBackend} interface.
- *
- * <p>
+ *
+ * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.
*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
@@ -49,19 +38,6 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
private static final long serialVersionUID = 4620415814639230247L;
// ------------------------------------------------------------------------
- // Configuration shortcut names
- // ------------------------------------------------------------------------
-
- /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
- public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
-
- /** The shortcut configuration name for the FileSystem State backend */
- public static final String FS_STATE_BACKEND_NAME = "filesystem";
-
- /** The shortcut configuration name for the RocksDB State Backend */
- public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
-
- // ------------------------------------------------------------------------
// State Backend - Persisting Byte Storage
// ------------------------------------------------------------------------
@@ -94,141 +70,4 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
public abstract OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception;
-
- // ------------------------------------------------------------------------
- // Loading the state backend from a configuration
- // ------------------------------------------------------------------------
-
- /**
- * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
- * in {@link CoreOptions#STATE_BACKEND}.
- *
- * <p>The state backends can be specified either via their shortcut name, or via the class name
- * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
- * is instantiated (via its zero-argument constructor) and its
- * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
- *
- * <p>Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}',
- * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and
- * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'.
- *
- * @param config The configuration to load the state backend from
- * @param classLoader The class loader that should be used to load the state backend
- * @param logger Optionally, a logger to log actions to (may be null)
- *
- * @return The instantiated state backend.
- *
- * @throws DynamicCodeLoadingException
- * Thrown if a state backend factory is configured and the factory class was not
- * found or the factory could not be instantiated
- * @throws IllegalConfigurationException
- * May be thrown by the StateBackendFactory when creating / configuring the state
- * backend in the factory
- * @throws IOException
- * May be thrown by the StateBackendFactory when instantiating the state backend
- */
- public static StateBackend loadStateBackendFromConfig(
- Configuration config,
- ClassLoader classLoader,
- @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
- checkNotNull(config, "config");
- checkNotNull(classLoader, "classLoader");
-
- final String backendName = config.getString(CoreOptions.STATE_BACKEND);
- if (backendName == null) {
- return null;
- }
-
- // by default the factory class is the backend name
- String factoryClassName = backendName;
-
- switch (backendName.toLowerCase()) {
- case MEMORY_STATE_BACKEND_NAME:
- if (logger != null) {
- logger.info("State backend is set to heap memory (checkpoint to JobManager)");
- }
- return new MemoryStateBackend();
-
- case FS_STATE_BACKEND_NAME:
- FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
- if (logger != null) {
- logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
- fsBackend.getBasePath());
- }
- return fsBackend;
-
- case ROCKSDB_STATE_BACKEND_NAME:
- factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
- // fall through to the 'default' case that uses reflection to load the backend
- // that way we can keep RocksDB in a separate module
-
- default:
- if (logger != null) {
- logger.info("Loading state backend via factory {}", factoryClassName);
- }
-
- StateBackendFactory<?> factory;
- try {
- @SuppressWarnings("rawtypes")
- Class<? extends StateBackendFactory> clazz =
- Class.forName(factoryClassName, false, classLoader)
- .asSubclass(StateBackendFactory.class);
-
- factory = clazz.newInstance();
- }
- catch (ClassNotFoundException e) {
- throw new DynamicCodeLoadingException(
- "Cannot find configured state backend factory class: " + backendName, e);
- }
- catch (ClassCastException | InstantiationException | IllegalAccessException e) {
- throw new DynamicCodeLoadingException("The class configured under '" +
- CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
- backendName + ')', e);
- }
-
- return factory.createFromConfig(config);
- }
- }
-
- /**
- * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
- * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the
- * default state backend (the {@link MemoryStateBackend}).
- *
- * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
- * how the state backend is loaded from the configuration.
- *
- * @param config The configuration to load the state backend from
- * @param classLoader The class loader that should be used to load the state backend
- * @param logger Optionally, a logger to log actions to (may be null)
- *
- * @return The instantiated state backend.
- *
- * @throws DynamicCodeLoadingException
- * Thrown if a state backend factory is configured and the factory class was not
- * found or the factory could not be instantiated
- * @throws IllegalConfigurationException
- * May be thrown by the StateBackendFactory when creating / configuring the state
- * backend in the factory
- * @throws IOException
- * May be thrown by the StateBackendFactory when instantiating the state backend
- */
- public static StateBackend loadStateBackendFromConfigOrCreateDefault(
- Configuration config,
- ClassLoader classLoader,
- @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
- final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
-
- if (fromConfig != null) {
- return fromConfig;
- }
- else {
- if (logger != null) {
- logger.info("No state backend has been configured, using default state backend (Memory / JobManager)");
- }
- return new MemoryStateBackend();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
new file mode 100644
index 0000000..f509e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ * An interface for state backends that pick up additional parameters from a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+ /**
+ * Creates a variant of the state backend that applies additional configuration parameters.
+ *
+ * <p>Settings that were directly done on the original state backend object in the application
+ * program typically have precedence over setting picked up from the configuration.
+ *
+ * <p>If no configuration is applied, or if the method directly applies configuration values to
+ * the (mutable) state backend object, this method may return the original state backend object.
+ * Otherwise it typically returns a modified copy.
+ *
+ * @param config The configuration to pick the values from.
+ * @return A reconfigured state backend.
+ *
+ * @throws IllegalConfigurationException Thrown if the configuration contained invalid entries.
+ */
+ StateBackend configure(Configuration config) throws IllegalConfigurationException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
new file mode 100644
index 0000000..857dfc1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from configurations.
+ */
+public class StateBackendLoader {
+
+ // ------------------------------------------------------------------------
+ // Configuration shortcut names
+ // ------------------------------------------------------------------------
+
+ /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
+ public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+ /** The shortcut configuration name for the FileSystem State backend */
+ public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+ /** The shortcut configuration name for the RocksDB State Backend */
+ public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+ // ------------------------------------------------------------------------
+ // Loading the state backend from a configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+ * in {@link CheckpointingOptions#STATE_BACKEND}.
+ *
+ * <p>The state backends can be specified either via their shortcut name, or via the class name
+ * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
+ * is instantiated (via its zero-argument constructor) and its
+ * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
+ *
+ * <p>Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+ * '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+ * '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+ *
+ * @param config The configuration to load the state backend from
+ * @param classLoader The class loader that should be used to load the state backend
+ * @param logger Optionally, a logger to log actions to (may be null)
+ *
+ * @return The instantiated state backend.
+ *
+ * @throws DynamicCodeLoadingException
+ * Thrown if a state backend factory is configured and the factory class was not
+ * found or the factory could not be instantiated
+ * @throws IllegalConfigurationException
+ * May be thrown by the StateBackendFactory when creating / configuring the state
+ * backend in the factory
+ * @throws IOException
+ * May be thrown by the StateBackendFactory when instantiating the state backend
+ */
+ public static StateBackend loadStateBackendFromConfig(
+ Configuration config,
+ ClassLoader classLoader,
+ @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+ checkNotNull(config, "config");
+ checkNotNull(classLoader, "classLoader");
+
+ final String backendName = config.getString(CheckpointingOptions.STATE_BACKEND);
+ if (backendName == null) {
+ return null;
+ }
+
+ // by default the factory class is the backend name
+ String factoryClassName = backendName;
+
+ switch (backendName.toLowerCase()) {
+ case MEMORY_STATE_BACKEND_NAME:
+ MemoryStateBackend memBackend = new MemoryStateBackendFactory().createFromConfig(config);
+
+ if (logger != null) {
+ Path memExternalized = memBackend.getCheckpointPath();
+ String extern = memExternalized == null ? "" :
+ " (externalized to " + memExternalized + ')';
+ logger.info("State backend is set to heap memory (checkpoint to JobManager) {}", extern);
+ }
+ return memBackend;
+
+ case FS_STATE_BACKEND_NAME:
+ FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
+ if (logger != null) {
+ logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+ fsBackend.getCheckpointPath());
+ }
+ return fsBackend;
+
+ case ROCKSDB_STATE_BACKEND_NAME:
+ factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
+ // fall through to the 'default' case that uses reflection to load the backend
+ // that way we can keep RocksDB in a separate module
+
+ default:
+ if (logger != null) {
+ logger.info("Loading state backend via factory {}", factoryClassName);
+ }
+
+ StateBackendFactory<?> factory;
+ try {
+ @SuppressWarnings("rawtypes")
+ Class<? extends StateBackendFactory> clazz =
+ Class.forName(factoryClassName, false, classLoader)
+ .asSubclass(StateBackendFactory.class);
+
+ factory = clazz.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured state backend factory class: " + backendName, e);
+ }
+ catch (ClassCastException | InstantiationException | IllegalAccessException e) {
+ throw new DynamicCodeLoadingException("The class configured under '" +
+ CheckpointingOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
+ backendName + ')', e);
+ }
+
+ return factory.createFromConfig(config);
+ }
+ }
+
+ /**
+ * Checks if an application-defined state backend is given, and if not, loads the state
+ * backend from the configuration, from the parameter 'state.backend', as defined
+ * in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
+ * default state backend (the {@link MemoryStateBackend}).
+ *
+ * <p>If an application-defined state backend is found, and the state backend is a
+ * {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration)}
+ * on the state backend.
+ *
+ * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
+ * how the state backend is loaded from the configuration.
+ *
+ * @param config The configuration to load the state backend from
+ * @param classLoader The class loader that should be used to load the state backend
+ * @param logger Optionally, a logger to log actions to (may be null)
+ *
+ * @return The instantiated state backend.
+ *
+ * @throws DynamicCodeLoadingException
+ * Thrown if a state backend factory is configured and the factory class was not
+ * found or the factory could not be instantiated
+ * @throws IllegalConfigurationException
+ * May be thrown by the StateBackendFactory when creating / configuring the state
+ * backend in the factory
+ * @throws IOException
+ * May be thrown by the StateBackendFactory when instantiating the state backend
+ */
+ public static StateBackend fromApplicationOrConfigOrDefault(
+ @Nullable StateBackend fromApplication,
+ Configuration config,
+ ClassLoader classLoader,
+ @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+ checkNotNull(config, "config");
+ checkNotNull(classLoader, "classLoader");
+
+ final StateBackend backend;
+
+ // (1) the application defined state backend has precedence
+ if (fromApplication != null) {
+ if (logger != null) {
+ logger.info("Using application-defined state backend: {}", fromApplication);
+ }
+
+ // see if this is supposed to pick up additional configuration parameters
+ if (fromApplication instanceof ConfigurableStateBackend) {
+ // needs to pick up configuration
+ if (logger != null) {
+ logger.info("Configuring application-defined state backend with job/cluster config");
+ }
+
+ backend = ((ConfigurableStateBackend) fromApplication).configure(config);
+ }
+ else {
+ // keep as is!
+ backend = fromApplication;
+ }
+ }
+ else {
+ // (2) check if the config defines a state backend
+ final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
+ if (fromConfig != null) {
+ backend = fromConfig;
+ }
+ else {
+ // (3) use the default
+ backend = new MemoryStateBackendFactory().createFromConfig(config);
+ if (logger != null) {
+ logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
+ }
+ }
+ }
+
+ // to keep supporting the old behavior where default (JobManager) Backend + HA mode = checkpoints in HA store
+ // we add the HA persistence dir as the checkpoint directory if none other is set
+
+ if (backend instanceof MemoryStateBackend) {
+ final MemoryStateBackend memBackend = (MemoryStateBackend) backend;
+
+ if (memBackend.getCheckpointPath() == null && HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
+ final String haStoragePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
+
+ if (haStoragePath != null) {
+ try {
+ Path checkpointDirPath = new Path(haStoragePath, UUID.randomUUID().toString());
+ if (checkpointDirPath.toUri().getScheme() == null) {
+ checkpointDirPath = checkpointDirPath.makeQualified(checkpointDirPath.getFileSystem());
+ }
+ Configuration tempConfig = new Configuration(config);
+ tempConfig.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDirPath.toString());
+ return memBackend.configure(tempConfig);
+ } catch (Exception ignored) {}
+ }
+ }
+ }
+
+ return backend;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated */
+ private StateBackendLoader() {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
new file mode 100644
index 0000000..6ec6f24
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints and externalized checkpoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * The state backend is configured with a base directory and persists the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStateBackend.class);
+
+ // ------------------------------------------------------------------------
+ // State Backend Properties
+ // ------------------------------------------------------------------------
+
+ /** The path where checkpoints will be stored, or null, if none has been configured. */
+ @Nullable
+ private final Path baseCheckpointPath;
+
+ /** The path where savepoints will be stored, or null, if none has been configured. */
+ @Nullable
+ private final Path baseSavepointPath;
+
+ /**
+ * Creates a backend with the given optional checkpoint- and savepoint base directories.
+ *
+ * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
+ * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
+ */
+ protected AbstractFileStateBackend(
+ @Nullable URI baseCheckpointPath,
+ @Nullable URI baseSavepointPath) {
+
+ this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),
+ baseSavepointPath == null ? null : new Path(baseSavepointPath));
+ }
+
+ /**
+ * Creates a backend with the given optional checkpoint- and savepoint base directories.
+ *
+ * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
+ * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
+ */
+ protected AbstractFileStateBackend(
+ @Nullable Path baseCheckpointPath,
+ @Nullable Path baseSavepointPath) {
+
+ this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
+ this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
+ }
+
+ /**
+ * Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in
+ * the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence
+ * over the value in the configuration. If the configuration does not specify a value, it is possible
+ * that the checkpoint-/savepoint directories in the backend will be null.
+ *
+ * <p>This constructor can be used to create a backend that is based partially on a given backend
+ * and partially on a configuration.
+ *
+ * @param baseCheckpointPath The checkpoint base directory to use (or null).
+ * @param baseSavepointPath The default savepoint directory to use (or null).
+ * @param configuration The configuration to read values from
+ */
+ protected AbstractFileStateBackend(
+ @Nullable Path baseCheckpointPath,
+ @Nullable Path baseSavepointPath,
+ Configuration configuration) {
+
+ this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),
+ parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the checkpoint base directory. Jobs will create job-specific subdirectories
+ * for checkpoints within this directory. May be null, if not configured.
+ *
+ * @return The checkpoint base directory
+ */
+ @Nullable
+ public Path getCheckpointPath() {
+ return baseCheckpointPath;
+ }
+
+ /**
+ * Gets the directory where savepoints are stored by default (when no custom path is given
+ * to the savepoint trigger command).
+ *
+ * @return The default directory for savepoints, or null, if no default directory has been configured.
+ */
+ @Nullable
+ public Path getSavepointPath() {
+ return baseSavepointPath;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ //
+ /**
+ * Checks the validity of the path's scheme and path.
+ *
+ * @param path The path to check.
+ * @return The URI as a Path.
+ *
+ * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+ */
+ private static Path validatePath(Path path) {
+ final URI uri = path.toUri();
+ final String scheme = uri.getScheme();
+ final String pathPart = uri.getPath();
+
+ // some validity checks
+ if (scheme == null) {
+ throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+ "Please specify the file system scheme explicitly in the URI.");
+ }
+ if (pathPart == null) {
+ throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
+ "Please specify a directory path for the checkpoint data.");
+ }
+ if (pathPart.length() == 0 || pathPart.equals("/")) {
+ throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+ }
+
+ return path;
+ }
+
+ @Nullable
+ private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {
+ if (path != null) {
+ return path;
+ }
+ else {
+ String configValue = config.getString(option);
+ try {
+ return configValue == null ? null : new Path(configValue);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalConfigurationException("Cannot parse value for " + option.key() +
+ " : " + configValue + " . Not a valid path.");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 952988f..2fff45a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,54 +18,91 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.util.TernaryBoolean;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The file state backend is a state backend that stores the state of streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence the backend's name).
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>Working state is kept on the TaskManager heap. If a TaskManager executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that TaskManager's memory.
+ *
+ * <p>This state backend stores small state chunks directly with the metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly.
+ *
+ * <h1>Persistence Guarantees</h1>
*
- * <p>The state backend has one core directory into which it puts all checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
- * files for each state, for example:
+ * <p>Checkpoints from this state backend are as persistent and available as filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state backend supports
+ * highly available setups. The backend additionally supports savepoints and externalized checkpoints.
*
- * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all state backends, this backend can either be configured within the application (by creating
+ * the backend with the respective constructor parameters and setting it on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * <p>If the state backend was specified in the application, it may pick up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if configured in the application
+ * without a default savepoint directory, it will pick up a default savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is implemented via the
+ * {@link #configure(Configuration)} method.
*/
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
private static final long serialVersionUID = -8191916350224044011L;
- /** By default, state smaller than 1024 bytes will not be written to files, but
- * will be stored directly with the metadata */
- public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+ /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
+ public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
- /** Maximum size of state that is stored with the metadata, rather than in files */
- private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
-
- /** The path to the directory for the checkpoint data, including the file system
- * description via scheme and optional authority */
- private final Path basePath;
+ // ------------------------------------------------------------------------
- /** State below this size will be stored as part of the metadata, rather than in files */
+ /** State below this size will be stored as part of the metadata, rather than in files.
+ * A value of '-1' means not yet configured, in which case the default will be used. */
private final int fileStateThreshold;
- /** Switch to chose between synchronous and asynchronous snapshots */
- private final boolean asynchronousSnapshots;
+ /** Switch to chose between synchronous and asynchronous snapshots.
+ * A value of 'undefined' means not yet configured, in which case the default will be used. */
+ private final TernaryBoolean asynchronousSnapshots;
+
+ // ------------------------------------------------------------------------
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
@@ -80,9 +117,8 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(String checkpointDataUri) throws IOException {
+ public FsStateBackend(String checkpointDataUri) {
this(new Path(checkpointDataUri));
}
@@ -100,10 +136,8 @@ public class FsStateBackend extends AbstractStateBackend {
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) {
this(new Path(checkpointDataUri), asynchronousSnapshots);
}
@@ -120,9 +154,8 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(Path checkpointDataUri) throws IOException {
+ public FsStateBackend(Path checkpointDataUri) {
this(checkpointDataUri.toUri());
}
@@ -140,10 +173,8 @@ public class FsStateBackend extends AbstractStateBackend {
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
+ public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) {
this(checkpointDataUri.toUri(), asynchronousSnapshots);
}
@@ -160,10 +191,30 @@ public class FsStateBackend extends AbstractStateBackend {
*
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(URI checkpointDataUri) throws IOException {
- this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, true);
+ public FsStateBackend(URI checkpointDataUri) {
+ this(checkpointDataUri, null, -1, TernaryBoolean.UNDEFINED);
+ }
+
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI. Optionally, this constructor accepts a default savepoint storage
+ * directory to which savepoints are stored when no custom target path is give to the savepoint
+ * command.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
+ * @param defaultSavepointDirectory The default directory to store savepoints to. May be null.
+ */
+ public FsStateBackend(URI checkpointDataUri, @Nullable URI defaultSavepointDirectory) {
+ this(checkpointDataUri, defaultSavepointDirectory, -1, TernaryBoolean.UNDEFINED);
}
/**
@@ -180,11 +231,10 @@ public class FsStateBackend extends AbstractStateBackend {
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
- public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
- this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, asynchronousSnapshots);
+ public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) {
+ this(checkpointDataUri, null, -1,
+ TernaryBoolean.fromBoolean(asynchronousSnapshots));
}
/**
@@ -202,13 +252,9 @@ public class FsStateBackend extends AbstractStateBackend {
* and the path to the checkpoint data directory.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
* rather than in files
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
- * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
*/
- public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
-
- this(checkpointDataUri, fileStateSizeThreshold, true);
+ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) {
+ this(checkpointDataUri, null, fileStateSizeThreshold, TernaryBoolean.UNDEFINED);
}
/**
@@ -225,34 +271,120 @@ public class FsStateBackend extends AbstractStateBackend {
* @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
- * rather than in files
+ * rather than in files (-1 for default value).
* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsStateBackend(
URI checkpointDataUri,
int fileStateSizeThreshold,
- boolean asynchronousSnapshots) throws IOException {
+ boolean asynchronousSnapshots) {
- checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
- checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
- "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
+ this(checkpointDataUri, null, fileStateSizeThreshold,
+ TernaryBoolean.fromBoolean(asynchronousSnapshots));
+ }
- this.fileStateThreshold = fileStateSizeThreshold;
- this.basePath = validateAndNormalizeUri(checkpointDataUri);
+ /**
+ * Creates a new state backend that stores its checkpoint data in the file system and location
+ * defined by the given URI.
+ *
+ * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
+ * must be accessible via {@link FileSystem#get(URI)}.
+ *
+ * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority
+ * (host and port), or that the Hadoop configuration that describes that information must be in the
+ * classpath.
+ *
+ * @param checkpointDirectory The path to write checkpoint metadata to.
+ * @param defaultSavepointDirectory The path to write savepoints to. If null, the value from
+ * the runtime configuration will be used, or savepoint
+ * target locations need to be passed when triggering a savepoint.
+ * @param fileStateSizeThreshold State below this size will be stored as part of the metadata,
+ * rather than in files. If -1, the value configured in the
+ * runtime configuration will be used, or the default value (1KB)
+ * if nothing is configured.
+ * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous
+ * snapshot mode. If UNDEFINED, the value configured in the
+ * runtime configuration will be used.
+ */
+ public FsStateBackend(
+ URI checkpointDirectory,
+ @Nullable URI defaultSavepointDirectory,
+ int fileStateSizeThreshold,
+ TernaryBoolean asynchronousSnapshots) {
+
+ super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);
+ checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");
+ checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+ "The threshold for file state size must be in [-1, %s], where '-1' means to use " +
+ "the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);
+
+ this.fileStateThreshold = fileStateSizeThreshold;
this.asynchronousSnapshots = asynchronousSnapshots;
}
/**
- * Gets the base directory where all state-containing files are stored.
- * The job specific directory is created inside this directory.
+ * Private constructor that creates a re-configured copy of the state backend.
*
- * @return The base directory.
+ * @param original The state backend to re-configure
+ * @param configuration The configuration
*/
+ private FsStateBackend(FsStateBackend original, Configuration configuration) {
+ super(original.getCheckpointPath(), original.getSavepointPath(), configuration);
+
+ // if asynchronous snapshots were configured, use that setting,
+ // else check the configuration
+ this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
+ configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
+
+ final int sizeThreshold = original.fileStateThreshold >= 0 ?
+ original.fileStateThreshold :
+ configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
+
+ if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
+ this.fileStateThreshold = sizeThreshold;
+ }
+ else {
+ this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+
+ // because this is the only place we (unlikely) ever log, we lazily
+ // create the logger here
+ LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
+ "Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the base directory where all the checkpoints are stored.
+ * The job-specific checkpoint directory is created inside this directory.
+ *
+ * @return The base directory for checkpoints.
+ *
+ * @deprecated Deprecated in favor of {@link #getCheckpointPath()}.
+ */
+ @Deprecated
public Path getBasePath() {
- return basePath;
+ return getCheckpointPath();
+ }
+
+ /**
+ * Gets the base directory where all the checkpoints are stored.
+ * The job-specific checkpoint directory is created inside this directory.
+ *
+ * @return The base directory for checkpoints.
+ */
+ @Nonnull
+ @Override
+ public Path getCheckpointPath() {
+ // we know that this can never be null by the way of constructor checks
+ //noinspection ConstantConditions
+ return super.getCheckpointPath();
}
/**
@@ -260,12 +392,41 @@ public class FsStateBackend extends AbstractStateBackend {
* This threshold ensures that the backend does not create a large amount of very small files,
* where potentially the file pointers are larger than the state itself.
*
- * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
+ * <p>If not explicitly configured, this is the default value of
+ * {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.
*
* @return The file size threshold, in bytes.
*/
public int getMinFileSizeThreshold() {
- return fileStateThreshold;
+ return fileStateThreshold >= 0 ?
+ fileStateThreshold :
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+ }
+
+ /**
+ * Gets whether the key/value data structures are asynchronously snapshotted.
+ *
+ * <p>If not explicitly configured, this is the default value of
+ * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
+ */
+ public boolean isUsingAsynchronousSnapshots() {
+ return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
+ }
+
+ // ------------------------------------------------------------------------
+ // Reconfiguration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a copy of this state backend that uses the values defined in the configuration
+ * for fields where that were not specified in this state backend.
+ *
+ * @param config the configuration
+ * @return The re-configured variant of the state backend
+ */
+ @Override
+ public FsStateBackend configure(Configuration config) {
+ return new FsStateBackend(this, config);
}
// ------------------------------------------------------------------------
@@ -274,7 +435,7 @@ public class FsStateBackend extends AbstractStateBackend {
@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold);
+ return new FsCheckpointStreamFactory(getCheckpointPath(), jobId, getMinFileSizeThreshold());
}
@Override
@@ -283,7 +444,7 @@ public class FsStateBackend extends AbstractStateBackend {
String operatorIdentifier,
String targetLocation) throws IOException {
- return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+ return new FsSavepointStreamFactory(new Path(targetLocation), jobId, getMinFileSizeThreshold());
}
@Override
@@ -302,7 +463,7 @@ public class FsStateBackend extends AbstractStateBackend {
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
- asynchronousSnapshots,
+ isUsingAsynchronousSnapshots(),
env.getExecutionConfig());
}
@@ -314,45 +475,19 @@ public class FsStateBackend extends AbstractStateBackend {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
- asynchronousSnapshots);
+ isUsingAsynchronousSnapshots());
}
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
@Override
public String toString() {
- return "File State Backend @ " + basePath;
- }
-
- /**
- * Checks and normalizes the checkpoint data URI. This method first checks the validity of the
- * URI (scheme, path, availability of a matching file system) and then normalizes the URI
- * to a path.
- *
- * <p>If the URI does not include an authority, but the file system configured for the URI has an
- * authority, then the normalized path will include this authority.
- *
- * @param checkpointDataUri The URI to check and normalize.
- * @return A normalized URI as a Path.
- *
- * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
- * @throws IOException Thrown, if no file system can be found for the URI's scheme.
- */
- private static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException {
- final String scheme = checkpointDataUri.getScheme();
- final String path = checkpointDataUri.getPath();
-
- // some validity checks
- if (scheme == null) {
- throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
- "Please specify the file system scheme explicitly in the URI.");
- }
- if (path == null) {
- throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
- "Please specify a directory path for the checkpoint data.");
- }
- if (path.length() == 0 || path.equals("/")) {
- throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
- }
-
- return new Path(checkpointDataUri);
+ return "File State Backend (" +
+ "checkpoints: '" + getCheckpointPath() +
+ "', savepoints: '" + getSavepointPath() +
+ "', asynchronous: " + asynchronousSnapshots +
+ ", fileStateThreshold: " + fileStateThreshold + ")";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 4c933ef..2640683 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -18,45 +18,34 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackendFactory;
-import java.io.IOException;
-
/**
- * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
*/
+@PublicEvolving
public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend> {
-
- /** The key under which the config stores the directory where checkpoints should be stored */
- public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir";
-
- /** The key under which the config stores the threshold for state to be store in memory,
- * rather than in files */
- public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold";
-
@Override
public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException {
- final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
- final int memoryThreshold = config.getInteger(
- MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
-
- if (checkpointDirURI == null) {
+ // we need to explicitly read the checkpoint directory here, because that
+ // is a required constructor parameter
+ final String checkpointDir = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+ if (checkpointDir == null) {
throw new IllegalConfigurationException(
"Cannot create the file system state backend: The configuration does not specify the " +
- "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
+ "checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
}
try {
- Path path = new Path(checkpointDirURI);
- return new FsStateBackend(path.toUri(), memoryThreshold);
+ return new FsStateBackend(checkpointDir).configure(config);
}
- catch (IOException | IllegalArgumentException e) {
+ catch (IllegalArgumentException e) {
throw new IllegalConfigurationException("Invalid configuration for the state backend", e);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index b8ebedf..2079a97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -18,78 +18,265 @@
package org.apache.flink.runtime.state.memory;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.util.TernaryBoolean;
+
+import javax.annotation.Nullable;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name),
+ * but the checkpoints will be persisted to a file system for high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a
+ * file system dependency in simple setups.
+ *
+ * <p>This state backend should be used only for experimentation, quick local setups,
+ * or for streaming applications that have very small state: Because it requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's memory, thus supporting
+ * large state sizes.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>State checkpointing with this state backend is subject to the following conditions:
+ * <ul>
+ * <li>Each individual state must not exceed the configured maximum state size
+ * (see {@link #getMaxStateSize()}.</li>
+ *
+ * <li>All state from one task (i.e., the sum of all operator states and keyed states from all
+ * chained operators of the task) must not exceed what the RPC system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that is typically not advised.</li>
+ *
+ * <li>The sum of all states in the application times all retained checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.</li>
+ * </ul>
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>For the use cases where the state sizes can be handled by this backend, the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), and checkpoints
+ * (when high-availability is configured).
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all state backends, this backend can either be configured within the application (by creating
+ * the backend with the respective constructor parameters and setting it on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * <p>If the state backend was specified in the application, it may pick up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if configured in the application
+ * without a default savepoint directory, it will pick up a default savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is implemented via the
+ * {@link #configure(Configuration)} method.
*/
-public class MemoryStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {
private static final long serialVersionUID = 4109305377809414635L;
- /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */
- private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
+ /** The default maximal size that the snapshotted memory state may have (5 MiBytes). */
+ public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
- /** The maximal size that the snapshotted memory state may have */
+ /** The maximal size that the snapshotted memory state may have. */
private final int maxStateSize;
- /** Switch to chose between synchronous and asynchronous snapshots */
- private final boolean asynchronousSnapshots;
+ /** Switch to chose between synchronous and asynchronous snapshots.
+ * A value of 'UNDEFINED' means not yet configured, in which case the default will be used. */
+ private final TernaryBoolean asynchronousSnapshots;
+
+ // ------------------------------------------------------------------------
/**
* Creates a new memory state backend that accepts states whose serialized forms are
* up to the default state size (5 MB).
+ *
+ * <p>Checkpoint and default savepoint locations are used as specified in the
+ * runtime configuration.
*/
public MemoryStateBackend() {
- this(DEFAULT_MAX_STATE_SIZE);
+ this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
}
/**
* Creates a new memory state backend that accepts states whose serialized forms are
- * up to the given number of bytes.
+ * up to the default state size (5 MB). The state backend uses asynchronous snapshots
+ * or synchronous snapshots as configured.
*
- * @param maxStateSize The maximal size of the serialized state
+ * <p>Checkpoint and default savepoint locations are used as specified in the
+ * runtime configuration.
+ *
+ * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
*/
- public MemoryStateBackend(int maxStateSize) {
- this(maxStateSize, true);
+ public MemoryStateBackend(boolean asynchronousSnapshots) {
+ this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.fromBoolean(asynchronousSnapshots));
}
/**
* Creates a new memory state backend that accepts states whose serialized forms are
- * up to the default state size (5 MB).
+ * up to the given number of bytes.
*
- * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
+ * <p>Checkpoint and default savepoint locations are used as specified in the
+ * runtime configuration.
+ *
+ * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+ * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+ * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+ * and the JobManager needs to be able to hold all aggregated state in its memory.
+ *
+ * @param maxStateSize The maximal size of the serialized state
*/
- public MemoryStateBackend(boolean asynchronousSnapshots) {
- this(DEFAULT_MAX_STATE_SIZE, asynchronousSnapshots);
+ public MemoryStateBackend(int maxStateSize) {
+ this(null, null, maxStateSize, TernaryBoolean.UNDEFINED);
}
/**
* Creates a new memory state backend that accepts states whose serialized forms are
- * up to the given number of bytes.
+ * up to the given number of bytes and that uses asynchronous snashots as configured.
+ *
+ * <p>Checkpoint and default savepoint locations are used as specified in the
+ * runtime configuration.
+ *
+ * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+ * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+ * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+ * and the JobManager needs to be able to hold all aggregated state in its memory.
*
* @param maxStateSize The maximal size of the serialized state
* @param asynchronousSnapshots Switch to enable asynchronous snapshots.
*/
public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
+ this(null, null, maxStateSize, TernaryBoolean.fromBoolean(asynchronousSnapshots));
+ }
+
+ /**
+ * Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata
+ * to, and to persist savepoints to.
+ *
+ * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
+ * the runtime configuration will be used.
+ * @param savepointPath The path to write savepoints to. If null, the value from
+ * the runtime configuration will be used.
+ */
+ public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath) {
+ this(checkpointPath, savepointPath, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
+ }
+
+ /**
+ * Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata
+ * and savepoints to, as well as configuring state thresholds and asynchronous operations.
+ *
+ * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
+ * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
+ * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
+ * and the JobManager needs to be able to hold all aggregated state in its memory.
+ *
+ * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
+ * the runtime configuration will be used.
+ * @param savepointPath The path to write savepoints to. If null, the value from
+ * the runtime configuration will be used.
+ * @param maxStateSize The maximal size of the serialized state.
+ * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous
+ * snapshot mode. If null, the value configured in the
+ * runtime configuration will be used.
+ */
+ public MemoryStateBackend(
+ @Nullable String checkpointPath,
+ @Nullable String savepointPath,
+ int maxStateSize,
+ TernaryBoolean asynchronousSnapshots) {
+
+ super(checkpointPath == null ? null : new Path(checkpointPath),
+ savepointPath == null ? null : new Path(savepointPath));
+
+ checkArgument(maxStateSize > 0, "maxStateSize must be > 0");
this.maxStateSize = maxStateSize;
+
this.asynchronousSnapshots = asynchronousSnapshots;
}
+ /**
+ * Private constructor that creates a re-configured copy of the state backend.
+ *
+ * @param original The state backend to re-configure
+ * @param configuration The configuration
+ */
+ private MemoryStateBackend(MemoryStateBackend original, Configuration configuration) {
+ super(original.getCheckpointPath(), original.getSavepointPath(), configuration);
+
+ this.maxStateSize = original.maxStateSize;
+
+ // if asynchronous snapshots were configured, use that setting,
+ // else check the configuration
+ this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
+ configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the maximum size that an individual state can have, as configured in the
+ * constructor (by default {@value #DEFAULT_MAX_STATE_SIZE}).
+ *
+ * @return The maximum size that an individual state can have
+ */
+ public int getMaxStateSize() {
+ return maxStateSize;
+ }
+
+ /**
+ * Gets whether the key/value data structures are asynchronously snapshotted.
+ *
+ * <p>If not explicitly configured, this is the default value of
+ * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
+ */
+ public boolean isUsingAsynchronousSnapshots() {
+ return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
+ }
+
+ // ------------------------------------------------------------------------
+ // Reconfiguration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a copy of this state backend that uses the values defined in the configuration
+ * for fields where that were not specified in this state backend.
+ *
+ * @param config the configuration
+ * @return The re-configured variant of the state backend
+ */
+ @Override
+ public MemoryStateBackend configure(Configuration config) {
+ return new MemoryStateBackend(this, config);
+ }
+
+ // ------------------------------------------------------------------------
+ // checkpoint state persistence
+ // ------------------------------------------------------------------------
+
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
@@ -98,12 +285,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
- asynchronousSnapshots);
- }
-
- @Override
- public String toString() {
- return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)";
+ isUsingAsynchronousSnapshots());
}
@Override
@@ -121,6 +303,10 @@ public class MemoryStateBackend extends AbstractStateBackend {
return new MemCheckpointStreamFactory(maxStateSize);
}
+ // ------------------------------------------------------------------------
+ // checkpoint state persistence
+ // ------------------------------------------------------------------------
+
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID,
@@ -136,7 +322,20 @@ public class MemoryStateBackend extends AbstractStateBackend {
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
- asynchronousSnapshots,
+ isUsingAsynchronousSnapshots(),
env.getExecutionConfig());
}
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "MemoryStateBackend (data in heap memory / checkpoints to JobManager) " +
+ "(checkpoints: '" + getCheckpointPath() +
+ "', savepoints: '" + getSavepointPath() +
+ "', asynchronous: " + asynchronousSnapshots +
+ ", maxStateSize: " + maxStateSize + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
new file mode 100644
index 0000000..10b6c20
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackendFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+/**
+ * A factory that creates an {@link MemoryStateBackend} from a configuration.
+ */
+@PublicEvolving
+public class MemoryStateBackendFactory implements StateBackendFactory<MemoryStateBackend> {
+
+ @Override
+ public MemoryStateBackend createFromConfig(Configuration config) {
+ return new MemoryStateBackend().configure(config);
+ }
+}
\ No newline at end of file