You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:53 UTC
[10/11] flink git commit: [FLINK-5822] [state backends] Make
JobManager / Checkpoint Coordinator aware of the root state backend
[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend
This closes #3411
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3446e66a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3446e66a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3446e66a
Branch: refs/heads/master
Commit: 3446e66aac63a3dfdaf8cfd4a73bd80a7f038379
Parents: 5b7f21d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 17:51:00 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 19:02:13 2017 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 12 +-
.../state/RocksDBStateBackendFactory.java | 19 +-
.../jobmanager/JMXJobManagerMetricTest.java | 2 +-
.../CheckpointConfigHandlerTest.java | 3 +
.../checkpoint/CheckpointCoordinator.java | 21 ++-
.../runtime/executiongraph/ExecutionGraph.java | 4 +-
.../executiongraph/ExecutionGraphBuilder.java | 32 +++-
.../jobgraph/tasks/JobSnapshottingSettings.java | 15 +-
.../runtime/state/AbstractStateBackend.java | 173 ++++++++++++++++++-
.../runtime/state/StateBackendFactory.java | 16 +-
.../state/filesystem/FsStateBackend.java | 31 +++-
.../state/filesystem/FsStateBackendFactory.java | 22 +--
.../flink/runtime/state/heap/package-info.java | 23 +++
.../runtime/state/internal/package-info.java | 52 ++++++
.../state/memory/MemoryStateBackend.java | 2 +-
.../checkpoint/CheckpointStatsTrackerTest.java | 1 +
.../checkpoint/CoordinatorShutdownTest.java | 5 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 3 +-
.../ArchivedExecutionGraphTest.java | 3 +-
.../tasks/JobSnapshottingSettingsTest.java | 6 +
.../jobmanager/JobManagerHARecoveryTest.java | 1 +
.../runtime/jobmanager/JobManagerTest.java | 5 +
.../flink/runtime/jobmanager/JobSubmitTest.java | 2 +-
.../runtime/state/StateBackendLoadingTest.java | 164 ++++++++++++++++++
.../runtime/jobmanager/JobManagerITCase.scala | 3 +
.../api/graph/StreamGraphGenerator.java | 2 +-
.../api/graph/StreamingJobGraphGenerator.java | 1 +
.../streaming/runtime/tasks/StreamTask.java | 72 ++------
.../runtime/tasks/BlockingCheckpointsTest.java | 2 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 56 +++---
.../streaming/runtime/StateBackendITCase.java | 2 +-
31 files changed, 609 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3fd5d0f..dd0e2f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,10 +29,12 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.util.AbstractID;
+
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +162,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private void lazyInitializeForJob(
Environment env,
- String operatorIdentifier) throws Exception {
+ String operatorIdentifier) throws IOException {
if (isInitialized) {
return;
@@ -193,7 +195,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
if (dirs.isEmpty()) {
- throw new Exception("No local storage directories available. " + errorMessage);
+ throw new IOException("No local storage directories available. " + errorMessage);
} else {
initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
}
@@ -235,7 +237,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception {
+ TaskKvStateRegistry kvStateRegistry) throws IOException {
// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
@@ -437,7 +439,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
// static library loading utilities
// ------------------------------------------------------------------------
- private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception {
+ private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
synchronized (RocksDBStateBackend.class) {
if (!rocksDbInitialized) {
@@ -488,7 +490,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
}
- throw new Exception("Could not load the native RocksDB library", lastException);
+ throw new IOException("Could not load the native RocksDB library", lastException);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index 5002272..bd9bcaa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -15,24 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
/**
* A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend}
* from a configuration.
*/
-public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBackend> {
+public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend> {
protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
@@ -44,9 +45,11 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa
public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
@Override
- public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
- String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
- String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+ public RocksDBStateBackend createFromConfig(Configuration config)
+ throws IllegalConfigurationException, IOException {
+
+ final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+ final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
if (checkpointDirURI == null) {
throw new IllegalConfigurationException(
@@ -67,8 +70,8 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa
return backend;
}
catch (IllegalArgumentException e) {
- throw new Exception("Cannot initialize RocksDB State Backend with URI '"
- + checkpointDirURI + '.', e);
+ throw new IllegalConfigurationException(
+ "Cannot initialize RocksDB State Backend with URI '" + checkpointDirURI + '.', e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index b3b7dfc..1fdac65 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest {
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
- 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));
+ 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), null, true));
flink.waitForActorsToBeAlive();
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e517c3c..95ced0a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -56,6 +56,7 @@ public class CheckpointConfigHandlerTest {
minPause,
maxConcurrent,
externalized,
+ null,
true);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -92,6 +93,7 @@ public class CheckpointConfigHandlerTest {
1212L,
12,
ExternalizedCheckpointSettings.none(),
+ null,
false); // at least once
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -122,6 +124,7 @@ public class CheckpointConfigHandlerTest {
1212L,
12,
externalizedSettings,
+ null,
false); // at least once
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6da6f7d..0592e3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -85,6 +85,12 @@ public class CheckpointCoordinator {
/** The job whose checkpoint this coordinator coordinates */
private final JobID job;
+ /** Default checkpoint properties **/
+ private final CheckpointProperties checkpointProperties;
+
+ /** The executor used for asynchronous calls, like potentially blocking I/O */
+ private final Executor executor;
+
/** Tasks who need to be sent a message when a checkpoint is started */
private final ExecutionVertex[] tasksToTrigger;
@@ -101,7 +107,9 @@ public class CheckpointCoordinator {
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
- /** Default directory for persistent checkpoints; <code>null</code> if none configured. */
+ /** Default directory for persistent checkpoints; <code>null</code> if none configured.
+ * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
+ @Nullable
private final String checkpointDirectory;
/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
@@ -154,11 +162,6 @@ public class CheckpointCoordinator {
@Nullable
private CheckpointStatsTracker statsTracker;
- /** Default checkpoint properties **/
- private final CheckpointProperties checkpointProperties;
-
- private final Executor executor;
-
// --------------------------------------------------------------------------------------------
public CheckpointCoordinator(
@@ -173,7 +176,7 @@ public class CheckpointCoordinator {
ExecutionVertex[] tasksToCommitTo,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
- String checkpointDirectory,
+ @Nullable String checkpointDirectory,
Executor executor) {
// sanity checks
@@ -211,6 +214,8 @@ public class CheckpointCoordinator {
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
+ this.executor = checkNotNull(executor);
+
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.timer = new Timer("Checkpoint Timer", true);
@@ -229,8 +234,6 @@ public class CheckpointCoordinator {
} catch (Throwable t) {
throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
}
-
- this.executor = checkNotNull(executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ad4347d..a76a421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
@@ -348,7 +349,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return false;
}
- public void enableSnapshotCheckpointing(
+ public void enableCheckpointing(
long interval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
@@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
String checkpointDir,
+ StateBackend metadataStore,
CheckpointStatsTracker statsTracker) {
// simple sanity checks
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c558e43..2a79302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -37,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -71,8 +75,8 @@ public class ExecutionGraphBuilder {
MetricGroup metrics,
int parallelismForAutoMax,
Logger log)
- throws JobExecutionException, JobException
- {
+ throws JobExecutionException, JobException {
+
checkNotNull(jobGraph, "job graph cannot be null");
final String jobName = jobGraph.getName();
@@ -191,7 +195,28 @@ public class ExecutionGraphBuilder {
String externalizedCheckpointsDir = jobManagerConfig.getString(
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
- executionGraph.enableSnapshotCheckpointing(
+ // load the state backend for checkpoint metadata.
+ // if specified in the application, use from there, otherwise load from configuration
+ final StateBackend metadataBackend;
+
+ final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+ if (applicationConfiguredBackend != null) {
+ metadataBackend = applicationConfiguredBackend;
+
+ log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
+ applicationConfiguredBackend);
+ }
+ else {
+ try {
+ metadataBackend = AbstractStateBackend
+ .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
+ }
+ catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+ throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
+ }
+ }
+
+ executionGraph.enableCheckpointing(
snapshotSettings.getCheckpointInterval(),
snapshotSettings.getCheckpointTimeout(),
snapshotSettings.getMinPauseBetweenCheckpoints(),
@@ -203,6 +228,7 @@ public class ExecutionGraphBuilder {
checkpointIdCounter,
completedCheckpoints,
externalizedCheckpointsDir,
+ metadataBackend,
checkpointStatsTracker);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 561ba89..233aa88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -19,7 +19,9 @@
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
+import javax.annotation.Nullable;
import java.util.List;
import static java.util.Objects.requireNonNull;
@@ -50,6 +52,10 @@ public class JobSnapshottingSettings implements java.io.Serializable {
/** Settings for externalized checkpoints. */
private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+ /** The default state backend, if configured by the user in the job */
+ @Nullable
+ private final StateBackend defaultStateBackend;
+
/**
* Flag indicating whether exactly once checkpoint mode has been configured.
* If <code>false</code>, at least once mode has been configured. This is
@@ -58,7 +64,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
* UI.
*/
private final boolean isExactlyOnce;
-
+
public JobSnapshottingSettings(
List<JobVertexID> verticesToTrigger,
List<JobVertexID> verticesToAcknowledge,
@@ -68,6 +74,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
ExternalizedCheckpointSettings externalizedCheckpointSettings,
+ @Nullable StateBackend defaultStateBackend,
boolean isExactlyOnce) {
// sanity checks
@@ -84,6 +91,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+ this.defaultStateBackend = defaultStateBackend;
this.isExactlyOnce = isExactlyOnce;
}
@@ -121,6 +129,11 @@ public class JobSnapshottingSettings implements java.io.Serializable {
return externalizedCheckpointSettings;
}
+ @Nullable
+ public StateBackend getDefaultStateBackend() {
+ return defaultStateBackend;
+ }
+
public boolean isExactlyOnce() {
return isExactlyOnce;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index a335e45..2cf20a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,20 +21,50 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* An abstract base implementation of the {@link StateBackend} interface.
+ *
+ * <p>
*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
private static final long serialVersionUID = 4620415814639230247L;
+ // ------------------------------------------------------------------------
+ // Configuration shortcut names
+ // ------------------------------------------------------------------------
+
+ /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
+ public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+ /** The shortcut configuration name for the FileSystem State backend */
+ public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+ /** The shortcut configuration name for the RocksDB State Backend */
+ public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+ // ------------------------------------------------------------------------
+ // State Backend - Persisting Byte Storage
+ // ------------------------------------------------------------------------
+
@Override
public abstract CheckpointStreamFactory createStreamFactory(
JobID jobId,
@@ -46,6 +76,10 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
String operatorIdentifier,
@Nullable String targetLocation) throws IOException;
+ // ------------------------------------------------------------------------
+ // State Backend - State-Holding Backends
+ // ------------------------------------------------------------------------
+
@Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
@@ -54,7 +88,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception;
+ TaskKvStateRegistry kvStateRegistry) throws IOException;
@Override
public OperatorStateBackend createOperatorStateBackend(
@@ -63,4 +97,141 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
return new DefaultOperatorStateBackend(env.getUserClassLoader());
}
+
+ // ------------------------------------------------------------------------
+ // Loading the state backend from a configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+ * in {@link CoreOptions#STATE_BACKEND}.
+ *
+ * <p>The state backends can be specified either via their shortcut name, or via the class name
+ * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
+ * is instantiated (via its zero-argument constructor) and its
+ * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
+ *
+ * <p>Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}',
+ * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and
+ * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'.
+ *
+ * @param config The configuration to load the state backend from
+ * @param classLoader The class loader that should be used to load the state backend
+ * @param logger Optionally, a logger to log actions to (may be null)
+ *
+ * @return The instantiated state backend.
+ *
+ * @throws DynamicCodeLoadingException
+ * Thrown if a state backend factory is configured and the factory class was not
+ * found or the factory could not be instantiated
+ * @throws IllegalConfigurationException
+ * May be thrown by the StateBackendFactory when creating / configuring the state
+ * backend in the factory
+ * @throws IOException
+ * May be thrown by the StateBackendFactory when instantiating the state backend
+ */
+ public static StateBackend loadStateBackendFromConfig(
+ Configuration config,
+ ClassLoader classLoader,
+ @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+ checkNotNull(config, "config");
+ checkNotNull(classLoader, "classLoader");
+
+ final String backendName = config.getString(CoreOptions.STATE_BACKEND);
+ if (backendName == null) {
+ return null;
+ }
+
+ // by default the factory class is the backend name
+ String factoryClassName = backendName;
+
+ switch (backendName.toLowerCase()) {
+ case MEMORY_STATE_BACKEND_NAME:
+ if (logger != null) {
+ logger.info("State backend is set to heap memory (checkpoint to JobManager)");
+ }
+ return new MemoryStateBackend();
+
+ case FS_STATE_BACKEND_NAME:
+ FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
+ if (logger != null) {
+ logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+ fsBackend.getBasePath());
+ }
+ return fsBackend;
+
+ case ROCKSDB_STATE_BACKEND_NAME:
+ factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
+ // fall through to the 'default' case that uses reflection to load the backend
+ // that way we can keep RocksDB in a separate module
+
+ default:
+ if (logger != null) {
+ logger.info("Loading state backend via factory {}", factoryClassName);
+ }
+
+ StateBackendFactory<?> factory;
+ try {
+ @SuppressWarnings("rawtypes")
+ Class<? extends StateBackendFactory> clazz =
+ Class.forName(factoryClassName, false, classLoader)
+ .asSubclass(StateBackendFactory.class);
+
+ factory = clazz.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new DynamicCodeLoadingException(
+ "Cannot find configured state backend factory class: " + backendName, e);
+ }
+ catch (ClassCastException | InstantiationException | IllegalAccessException e) {
+ throw new DynamicCodeLoadingException("The class configured under '" +
+ CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
+ backendName + ')', e);
+ }
+
+ return factory.createFromConfig(config);
+ }
+ }
+
+ /**
+ * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+ * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the
+ * default state backend (the {@link MemoryStateBackend}).
+ *
+ * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
+ * how the state backend is loaded from the configuration.
+ *
+ * @param config The configuration to load the state backend from
+ * @param classLoader The class loader that should be used to load the state backend
+ * @param logger Optionally, a logger to log actions to (may be null)
+ *
+ * @return The instantiated state backend.
+ *
+ * @throws DynamicCodeLoadingException
+ * Thrown if a state backend factory is configured and the factory class was not
+ * found or the factory could not be instantiated
+ * @throws IllegalConfigurationException
+ * May be thrown by the StateBackendFactory when creating / configuring the state
+ * backend in the factory
+ * @throws IOException
+ * May be thrown by the StateBackendFactory when instantiating the state backend
+ */
+ public static StateBackend loadStateBackendFromConfigOrCreateDefault(
+ Configuration config,
+ ClassLoader classLoader,
+ @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+ final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
+
+ if (fromConfig != null) {
+ return fromConfig;
+ }
+ else {
+ if (logger != null) {
+ logger.info("No state backend has been configured, using default state backend (Memory / JobManager)");
+ }
+ return new MemoryStateBackend();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index 39e7ed2..78c976a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -18,17 +18,24 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import java.io.IOException;
import java.io.Serializable;
/**
* A factory to create a specific state backend. The state backend creation gets a Configuration
* object that can be used to read further config values.
*
+ * <p>The state backend factory is typically specified in the configuration to produce a
+ * configured state backend.
+ *
* @param <T> The type of the state backend created.
*/
-public interface StateBackendFactory<T extends AbstractStateBackend> extends Serializable {
+@PublicEvolving
+public interface StateBackendFactory<T extends StateBackend> extends Serializable {
/**
* Creates the state backend, optionally using the given configuration.
@@ -36,7 +43,10 @@ public interface StateBackendFactory<T extends AbstractStateBackend> extends Ser
* @param config The Flink configuration (loaded by the TaskManager).
* @return The created state backend.
*
- * @throws Exception Exceptions during instantiation can be forwarded.
+ * @throws IllegalConfigurationException
+ * If the configuration misses critical values, or specifies invalid values
+ * @throws IOException
+ * If the state backend initialization failed due to an I/O exception
*/
- AbstractStateBackend createFromConfig(Configuration config) throws Exception;
+ T createFromConfig(Configuration config) throws IllegalConfigurationException, IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index b614d98..5e8a15d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,6 +36,8 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* The file state backend is a state backend that stores the state of streaming jobs in a file system.
*
@@ -139,17 +141,14 @@ public class FsStateBackend extends AbstractStateBackend {
* rather than in files
*
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+ * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
*/
public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
- if (fileStateSizeThreshold < 0) {
- throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
- }
- if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
- throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
- MAX_FILE_STATE_THRESHOLD);
- }
+ checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
+ checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+ "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
+
this.fileStateThreshold = fileStateSizeThreshold;
-
this.basePath = validateAndNormalizeUri(checkpointDataUri);
}
@@ -163,6 +162,19 @@ public class FsStateBackend extends AbstractStateBackend {
return basePath;
}
+ /**
+ * Gets the threshold below which state is stored as part of the metadata, rather than in files.
+ * This threshold ensures that the backend does not create a large amount of very small files,
+ * where potentially the file pointers are larger than the state itself.
+ *
+ * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
+ *
+ * @return The file size threshold, in bytes.
+ */
+ public int getMinFileSizeThreshold() {
+ return fileStateThreshold;
+ }
+
// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------
@@ -189,7 +201,8 @@ public class FsStateBackend extends AbstractStateBackend {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception {
+ TaskKvStateRegistry kvStateRegistry) throws IOException {
+
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 042700c..4c933ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackendFactory;
+import java.io.IOException;
+
/**
* A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
* from a configuration.
@@ -35,28 +37,26 @@ public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend
/** The key under which the config stores the threshold for state to be store in memory,
* rather than in files */
public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold";
-
-
+
+
@Override
- public FsStateBackend createFromConfig(Configuration config) throws Exception {
- String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
- int memoryThreshold = config.getInteger(
+ public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException {
+ final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+ final int memoryThreshold = config.getInteger(
MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
-
+
if (checkpointDirURI == null) {
throw new IllegalConfigurationException(
"Cannot create the file system state backend: The configuration does not specify the " +
"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
}
-
+
try {
Path path = new Path(checkpointDirURI);
return new FsStateBackend(path.toUri(), memoryThreshold);
}
- catch (IllegalArgumentException e) {
- throw new Exception("Cannot initialize File System State Backend with URI '"
- + checkpointDirURI + '.', e);
+ catch (IOException | IllegalArgumentException e) {
+ throw new IllegalConfigurationException("Invalid configuration for the state backend", e);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
new file mode 100644
index 0000000..2f34ed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the classes for key/value state backends that store the state
+ * on the JVM heap as objects.
+ */
+package org.apache.flink.runtime.state.heap;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
new file mode 100644
index 0000000..fcc4df9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package holds the classes of the <b>internal state type hierarchy</b>.
+ *
+ * <p>The internal state classes give access to the namespace getters and setters and access to
+ * additional functionality, like raw value access or state merging.
+ *
+ * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
+ * intended to be used by user applications. These internal methods are considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ *
+ * <p>Each specific type in the internal state hierarchy extends the type from the public
+ * state hierarchy. The following illustrates the relationship between the public- and the internal
+ * hierarchy at the example of a subset of the classes:
+ *
+ * <pre>
+ * State
+ * |
+ * +-------------------InternalKvState
+ * | |
+ * MergingState |
+ * | |
+ * +-----------------InternalMergingState
+ * | |
+ * +--------+------+ |
+ * | | |
+ * ReducingState ListState +-----+-----------------+
+ * | | | |
+ * | +----------- -----------------InternalListState
+ * | |
+ * +------------------InternalReducingState
+ * </pre>
+ */
+package org.apache.flink.runtime.state.internal;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2cc1164..6e6b034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -90,7 +90,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException {
+ TaskKvStateRegistry kvStateRegistry) {
return new HeapKeyedStateBackend<>(
kvStateRegistry,
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 9a39182..7ab71cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -62,6 +62,7 @@ public class CheckpointStatsTrackerTest {
191929L,
123,
ExternalizedCheckpointSettings.none(),
+ null,
false);
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 7949ef0..976da48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.Test;
@@ -67,7 +66,7 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
- 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true));
+ 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -126,7 +125,7 @@ public class CoordinatorShutdownTest {
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
- 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true));
+ 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 47e6826..8f565dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -106,7 +106,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
ClassLoader.getSystemClassLoader(),
new UnregisteredMetricsGroup());
- executionGraph.enableSnapshotCheckpointing(
+ executionGraph.enableCheckpointing(
100,
100,
100,
@@ -118,6 +118,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
counter,
store,
null,
+ null,
CheckpointStatsTrackerTest.createTestTracker());
JobVertex jobVertex = new JobVertex("MockVertex");
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 46ce3f4..3090172 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -112,7 +112,7 @@ public class ArchivedExecutionGraphTest {
mock(JobSnapshottingSettings.class),
new UnregisteredMetricsGroup());
- runtimeGraph.enableSnapshotCheckpointing(
+ runtimeGraph.enableCheckpointing(
100,
100,
100,
@@ -124,6 +124,7 @@ public class ArchivedExecutionGraphTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
+ null,
statsTracker);
Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
index 667dbca..2508d5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class JobSnapshottingSettingsTest {
@@ -42,6 +45,7 @@ public class JobSnapshottingSettingsTest {
112,
12,
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+ new MemoryStateBackend(),
false);
JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -55,5 +59,7 @@ public class JobSnapshottingSettingsTest {
assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+ assertNotNull(copy.getDefaultStateBackend());
+ assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index cbb077c..115b06c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -225,6 +225,7 @@ public class JobManagerHARecoveryTest {
0,
1,
ExternalizedCheckpointSettings.none(),
+ null,
true));
BlockingStatefulInvokable.initializeStaticHelpers(slots);
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index c5f6d99..727fc65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -829,6 +829,7 @@ public class JobManagerTest {
0,
Integer.MAX_VALUE,
ExternalizedCheckpointSettings.none(),
+ null,
true);
jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -954,6 +955,7 @@ public class JobManagerTest {
0,
Integer.MAX_VALUE,
ExternalizedCheckpointSettings.none(),
+ null,
true);
jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1059,6 +1061,7 @@ public class JobManagerTest {
0,
Integer.MAX_VALUE,
ExternalizedCheckpointSettings.none(),
+ null,
true);
jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1161,6 +1164,7 @@ public class JobManagerTest {
0,
Integer.MAX_VALUE,
ExternalizedCheckpointSettings.none(),
+ null,
true);
jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1207,6 +1211,7 @@ public class JobManagerTest {
0,
Integer.MAX_VALUE,
ExternalizedCheckpointSettings.none(),
+ null,
true);
newJobGraph.setSnapshotSettings(newSnapshottingSettings);
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index feb3d4d..529c100 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -229,7 +229,7 @@ public class JobSubmitTest {
JobGraph jg = new JobGraph("test job", jobVertex);
jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
- 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true));
+ 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true));
return jg;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
new file mode 100644
index 0000000..a64faf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that state backends are properly loaded from configuration.
+ */
+public class StateBackendLoadingTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ private final ClassLoader cl = getClass().getClassLoader();
+
+ private final String backendKey = CoreOptions.STATE_BACKEND.key();
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testNoStateBackendDefined() throws Exception {
+ assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null));
+ }
+
+ @Test
+ public void testInstantiateMemoryBackendByDefault() throws Exception {
+ StateBackend backend = AbstractStateBackend
+ .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+
+ assertTrue(backend instanceof MemoryStateBackend);
+ }
+
+ @Test
+ public void testLoadMemoryStateBackend() throws Exception {
+ // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+ final Configuration config = new Configuration();
+ config.setString(backendKey, "jobmanager");
+
+ StateBackend backend = AbstractStateBackend
+ .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+
+ assertTrue(backend instanceof MemoryStateBackend);
+ }
+
+ @Test
+ public void testLoadFileSystemStateBackend() throws Exception {
+ final String checkpointDir = new Path(tmp.getRoot().toURI()).toString();
+ final Path expectedPath = new Path(checkpointDir);
+ final int threshold = 1000000;
+
+ // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+ final Configuration config1 = new Configuration();
+ config1.setString(backendKey, "filesystem");
+ config1.setString("state.checkpoints.dir", checkpointDir);
+ config1.setString("state.backend.fs.checkpointdir", checkpointDir);
+ config1.setInteger("state.backend.fs.memory-threshold", threshold);
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey, FsStateBackendFactory.class.getName());
+ config2.setString("state.checkpoints.dir", checkpointDir);
+ config2.setString("state.backend.fs.checkpointdir", checkpointDir);
+ config2.setInteger("state.backend.fs.memory-threshold", threshold);
+
+ StateBackend backend1 = AbstractStateBackend
+ .loadStateBackendFromConfigOrCreateDefault(config1, cl, null);
+
+ StateBackend backend2 = AbstractStateBackend
+ .loadStateBackendFromConfigOrCreateDefault(config2, cl, null);
+
+ assertTrue(backend1 instanceof FsStateBackend);
+ assertTrue(backend2 instanceof FsStateBackend);
+
+ FsStateBackend fs1 = (FsStateBackend) backend1;
+ FsStateBackend fs2 = (FsStateBackend) backend2;
+
+ assertEquals(expectedPath, fs1.getBasePath());
+ assertEquals(expectedPath, fs2.getBasePath());
+ assertEquals(threshold, fs1.getMinFileSizeThreshold());
+ assertEquals(threshold, fs2.getMinFileSizeThreshold());
+ }
+
+ /**
+ * This test makes sure that failures properly manifest when the state backend could not be loaded.
+ */
+ @Test
+ public void testLoadingFails() throws Exception {
+ final Configuration config = new Configuration();
+
+ // try a value that is neither recognized as a name, nor corresponds to a class
+ config.setString(backendKey, "does.not.exist");
+ try {
+ AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ fail("should fail with an exception");
+ } catch (DynamicCodeLoadingException ignored) {
+ // expected
+ }
+
+ // try a class that is not a factory
+ config.setString(backendKey, java.io.File.class.getName());
+ try {
+ AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ fail("should fail with an exception");
+ } catch (DynamicCodeLoadingException ignored) {
+ // expected
+ }
+
+ // a factory that fails
+ config.setString(backendKey, FailingFactory.class.getName());
+ try {
+ AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ fail("should fail with an exception");
+ } catch (IOException ignored) {
+ // expected
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ static final class FailingFactory implements StateBackendFactory<StateBackend> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException {
+ throw new IOException("fail!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 75f1fd4..31e72dd 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -822,6 +822,7 @@ class JobManagerITCase(_system: ActorSystem)
60000,
1,
ExternalizedCheckpointSettings.none,
+ null,
true))
// Submit job...
@@ -881,6 +882,7 @@ class JobManagerITCase(_system: ActorSystem)
60000,
1,
ExternalizedCheckpointSettings.none,
+ null,
true))
// Submit job...
@@ -948,6 +950,7 @@ class JobManagerITCase(_system: ActorSystem)
60000,
1,
ExternalizedCheckpointSettings.none,
+ null,
true))
// Submit job...
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f55ff47..bd018c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -82,7 +82,7 @@ public class StreamGraphGenerator {
public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
// The StreamGraph that is being built, this is initialized at the beginning.
- private StreamGraph streamGraph;
+ private final StreamGraph streamGraph;
private final StreamExecutionEnvironment env;
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a4bb165..003eff9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -539,6 +539,7 @@ public class StreamingJobGraphGenerator {
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
externalizedCheckpointSettings,
+ streamGraph.getStateBackend(),
isExactlyOnce);
jobGraph.setSnapshotSettings(settings);
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 938ffd2..1e208ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,9 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -43,13 +40,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -63,6 +57,7 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +142,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private StreamConfig configuration;
/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
- private AbstractStateBackend stateBackend;
+ private StateBackend stateBackend;
/** Keyed state backend for the head operator, if it is keyed. There can only ever be one. */
private AbstractKeyedStateBackend<?> keyedStateBackend;
@@ -713,61 +708,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// State backend
// ------------------------------------------------------------------------
- private AbstractStateBackend createStateBackend() throws Exception {
- AbstractStateBackend stateBackend = configuration.getStateBackend(getUserCodeClassLoader());
+ private StateBackend createStateBackend() throws Exception {
+ final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
- if (stateBackend != null) {
+ if (fromJob != null) {
// backend has been configured on the environment
LOG.info("Using user-defined state backend: {}.", stateBackend);
- } else {
- // see if we have a backend specified in the configuration
- Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
- String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
-
- if (backendName == null) {
- LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
- backendName = "jobmanager";
- }
-
- switch (backendName.toLowerCase()) {
- case "jobmanager":
- LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
- stateBackend = new MemoryStateBackend();
- break;
-
- case "filesystem":
- FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
- LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
- backend.getBasePath());
- stateBackend = backend;
- break;
-
- case "rocksdb":
- backendName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
- // fall through to the 'default' case that uses reflection to load the backend
- // that way we can keep RocksDB in a separate module
-
- default:
- try {
- @SuppressWarnings("rawtypes")
- Class<? extends StateBackendFactory> clazz =
- Class.forName(backendName, false, getUserCodeClassLoader()).
- asSubclass(StateBackendFactory.class);
-
- stateBackend = clazz.newInstance().createFromConfig(flinkConfig);
- } catch (ClassNotFoundException e) {
- throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
- } catch (ClassCastException e) {
- throw new IllegalConfigurationException("The class configured under '" +
- CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
- backendName + ')');
- } catch (Throwable t) {
- throw new IllegalConfigurationException("Cannot create configured state backend", t);
- }
- }
+ return fromJob;
+ }
+ else {
+ return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
+ getEnvironment().getTaskManagerInfo().getConfiguration(),
+ getUserCodeClassLoader(),
+ LOG);
}
-
- return stateBackend;
}
public OperatorStateBackend createOperatorStateBackend(
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 51294ce..e266ea1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -183,7 +183,7 @@ public class BlockingCheckpointsTest {
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID, String operatorIdentifier,
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
- KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception {
+ KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3d01fdd..3826051 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -807,33 +807,39 @@ public class StreamTaskTest extends TestLogger {
private static final long serialVersionUID = 1L;
@Override
- public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
+ public AbstractStateBackend createFromConfig(Configuration config) {
AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class);
- Mockito.when(stateBackendMock.createOperatorStateBackend(
- Mockito.any(Environment.class),
- Mockito.any(String.class)))
- .thenAnswer(new Answer<OperatorStateBackend>() {
- @Override
- public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
- return Mockito.mock(OperatorStateBackend.class);
- }
- });
-
- Mockito.when(stateBackendMock.createKeyedStateBackend(
- Mockito.any(Environment.class),
- Mockito.any(JobID.class),
- Mockito.any(String.class),
- Mockito.any(TypeSerializer.class),
- Mockito.any(int.class),
- Mockito.any(KeyGroupRange.class),
- Mockito.any(TaskKvStateRegistry.class)))
- .thenAnswer(new Answer<AbstractKeyedStateBackend>() {
- @Override
- public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
- return Mockito.mock(AbstractKeyedStateBackend.class);
- }
- });
+ try {
+ Mockito.when(stateBackendMock.createOperatorStateBackend(
+ Mockito.any(Environment.class),
+ Mockito.any(String.class)))
+ .thenAnswer(new Answer<OperatorStateBackend>() {
+ @Override
+ public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return Mockito.mock(OperatorStateBackend.class);
+ }
+ });
+
+ Mockito.when(stateBackendMock.createKeyedStateBackend(
+ Mockito.any(Environment.class),
+ Mockito.any(JobID.class),
+ Mockito.any(String.class),
+ Mockito.any(TypeSerializer.class),
+ Mockito.any(int.class),
+ Mockito.any(KeyGroupRange.class),
+ Mockito.any(TaskKvStateRegistry.class)))
+ .thenAnswer(new Answer<AbstractKeyedStateBackend>() {
+ @Override
+ public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return Mockito.mock(AbstractKeyedStateBackend.class);
+ }
+ });
+ }
+ catch (Exception e) {
+ // this is needed, because the signatures of the mocked methods throw 'Exception'
+ throw new RuntimeException(e);
+ }
return stateBackendMock;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 79665dd..4677242 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -109,7 +109,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception {
+ TaskKvStateRegistry kvStateRegistry) throws IOException {
throw new SuccessException();
}
}