You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:24 UTC
[07/17] flink git commit: [FLINK-5823] [checkpoints] State backends
define checkpoint and savepoint directories, improved configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5893d1d..6a84a11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
@@ -102,7 +103,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
counter,
store,
null,
- null,
+ new MemoryStateBackend(),
CheckpointStatsTrackerTest.createTestTracker());
JobVertex jobVertex = new JobVertex("MockVertex");
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 7b9d9aa..3fe8613 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -66,8 +67,6 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ArchivedExecutionGraphTest extends TestLogger {
- private static JobVertexID v1ID = new JobVertexID();
- private static JobVertexID v2ID = new JobVertexID();
private static ExecutionGraph runtimeGraph;
@@ -77,8 +76,8 @@ public class ArchivedExecutionGraphTest extends TestLogger {
// Setup
// -------------------------------------------------------------------------------------------------------------
- v1ID = new JobVertexID();
- v2ID = new JobVertexID();
+ JobVertexID v1ID = new JobVertexID();
+ JobVertexID v2ID = new JobVertexID();
JobVertex v1 = new JobVertex("v1", v1ID);
JobVertex v2 = new JobVertex("v2", v2ID);
@@ -89,7 +88,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
+ List<JobVertex> vertices = new ArrayList<>(Arrays.asList(v1, v2));
ExecutionConfig config = new ExecutionConfig();
@@ -135,7 +134,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- null,
+ new MemoryStateBackend(),
statsTracker);
Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 01d2346..452afd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -149,16 +150,16 @@ import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_AS
import static org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import static org.mockito.Mockito.mock;
public class JobManagerTest extends TestLogger {
@Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
private static ActorSystem system;
@@ -830,7 +831,7 @@ public class JobManagerTest extends TestLogger {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
Configuration config = new Configuration();
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.toURI().toString());
ActorSystem actorSystem = null;
ActorGateway jobManager = null;
@@ -932,13 +933,13 @@ public class JobManagerTest extends TestLogger {
}
// Verify savepoint path
- assertNotEquals("Savepoint not triggered", null, savepointPath);
+ assertNotNull("Savepoint not triggered", savepointPath);
// Wait for job status change
Await.ready(cancelled, timeout);
- File savepointFile = new File(savepointPath);
- assertEquals(true, savepointFile.exists());
+ File savepointFile = new File(new Path(savepointPath).getPath());
+ assertTrue(savepointFile.exists());
} finally {
if (actorSystem != null) {
actorSystem.shutdown();
@@ -1157,7 +1158,7 @@ public class JobManagerTest extends TestLogger {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
Configuration config = new Configuration();
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.toURI().toString());
ActorSystem actorSystem = null;
ActorGateway jobManager = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 8ad368b..3d43cd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.TernaryBoolean;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -93,7 +94,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
@Test
public void testOversizedState() {
try {
- MemoryStateBackend backend = new MemoryStateBackend(10);
+ MemoryStateBackend backend = new MemoryStateBackend(null, null, 10, TernaryBoolean.TRUE);
CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
HashMap<String, Integer> state = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index a64faf1..cd4e355 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -18,22 +18,27 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import java.io.IOException;
+import java.net.URI;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -48,61 +53,199 @@ public class StateBackendLoadingTest {
private final ClassLoader cl = getClass().getClassLoader();
- private final String backendKey = CoreOptions.STATE_BACKEND.key();
+ private final String backendKey = CheckpointingOptions.STATE_BACKEND.key();
// ------------------------------------------------------------------------
+ // defaults
+ // ------------------------------------------------------------------------
@Test
public void testNoStateBackendDefined() throws Exception {
- assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null));
+ assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null));
}
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
- StateBackend backend = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+ StateBackend backend =
+ StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null);
assertTrue(backend instanceof MemoryStateBackend);
}
@Test
- public void testLoadMemoryStateBackend() throws Exception {
- // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
- // to guard against config-breaking changes of the name
+ public void testApplicationDefinedHasPrecedence() throws Exception {
+ final StateBackend appBackend = Mockito.mock(StateBackend.class);
+
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager");
- StateBackend backend = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+ StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null);
+ assertEquals(appBackend, backend);
+ }
- assertTrue(backend instanceof MemoryStateBackend);
+ // ------------------------------------------------------------------------
+ // Memory State Backend
+ // ------------------------------------------------------------------------
+
+ /**
+ * Validates loading a memory state backend from the cluster configuration.
+ */
+ @Test
+ public void testLoadMemoryStateBackendNoParameters() throws Exception {
+ // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+
+ final Configuration config1 = new Configuration();
+ config1.setString(backendKey, "jobmanager");
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
+
+ StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+ StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+ assertTrue(backend1 instanceof MemoryStateBackend);
+ assertTrue(backend2 instanceof MemoryStateBackend);
}
+ /**
+ * Validates loading a memory state backend with additional parameters from the cluster configuration.
+ */
+ @Test
+ public void testLoadMemoryStateWithParameters() throws Exception {
+ final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+ final Path expectedCheckpointPath = new Path(checkpointDir);
+ final Path expectedSavepointPath = new Path(savepointDir);
+
+ final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
+
+ // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+ // to guard against config-breaking changes of the name
+
+ final Configuration config1 = new Configuration();
+ config1.setString(backendKey, "jobmanager");
+ config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
+ config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
+
+ MemoryStateBackend backend1 = (MemoryStateBackend)
+ StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+ MemoryStateBackend backend2 = (MemoryStateBackend)
+ StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+ assertNotNull(backend1);
+ assertNotNull(backend2);
+
+ assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
+ assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
+ assertEquals(expectedSavepointPath, backend1.getSavepointPath());
+ assertEquals(expectedSavepointPath, backend2.getSavepointPath());
+ assertEquals(async, backend1.isUsingAsynchronousSnapshots());
+ assertEquals(async, backend2.isUsingAsynchronousSnapshots());
+ }
+
+ /**
+ * Validates taking the application-defined memory state backend and adding additional
+ * parameters from the cluster configuration.
+ */
+ @Test
+ public void testConfigureMemoryStateBackend() throws Exception {
+ final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+ final Path expectedCheckpointPath = new Path(checkpointDir);
+ final Path expectedSavepointPath = new Path(savepointDir);
+
+ final int maxSize = 100;
+ final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
+
+ final MemoryStateBackend backend = new MemoryStateBackend(maxSize, async);
+
+ final Configuration config = new Configuration();
+ config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, !async);
+
+ StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+ assertTrue(loadedBackend instanceof MemoryStateBackend);
+
+ final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
+ assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
+ assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
+ assertEquals(maxSize, memBackend.getMaxStateSize());
+ assertEquals(async, memBackend.isUsingAsynchronousSnapshots());
+ }
+
+ /**
+ * Validates taking the application-defined memory state backend and adding additional
+ * parameters from the cluster configuration, but giving precedence to application-defined
+ * parameters over configuration-defined parameters.
+ */
+ @Test
+ public void testConfigureMemoryStateBackendMixed() throws Exception {
+ final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+
+ final Path expectedCheckpointPath = new Path(appCheckpointDir);
+ final Path expectedSavepointPath = new Path(savepointDir);
+
+ final MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);
+
+ final Configuration config = new Configuration();
+ config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this parameter should not be picked up
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+
+ StateBackend loadedBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+ assertTrue(loadedBackend instanceof MemoryStateBackend);
+
+ final MemoryStateBackend memBackend = (MemoryStateBackend) loadedBackend;
+ assertEquals(expectedCheckpointPath, memBackend.getCheckpointPath());
+ assertEquals(expectedSavepointPath, memBackend.getSavepointPath());
+ }
+
+ // ------------------------------------------------------------------------
+ // File System State Backend
+ // ------------------------------------------------------------------------
+
+ /**
+ * Validates loading a file system state backend with additional parameters from the cluster configuration.
+ */
@Test
public void testLoadFileSystemStateBackend() throws Exception {
- final String checkpointDir = new Path(tmp.getRoot().toURI()).toString();
- final Path expectedPath = new Path(checkpointDir);
+ final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+ final Path expectedCheckpointsPath = new Path(checkpointDir);
+ final Path expectedSavepointsPath = new Path(savepointDir);
final int threshold = 1000000;
+ final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "filesystem");
- config1.setString("state.checkpoints.dir", checkpointDir);
- config1.setString("state.backend.fs.checkpointdir", checkpointDir);
- config1.setInteger("state.backend.fs.memory-threshold", threshold);
+ config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+ config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
final Configuration config2 = new Configuration();
config2.setString(backendKey, FsStateBackendFactory.class.getName());
- config2.setString("state.checkpoints.dir", checkpointDir);
- config2.setString("state.backend.fs.checkpointdir", checkpointDir);
- config2.setInteger("state.backend.fs.memory-threshold", threshold);
-
- StateBackend backend1 = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(config1, cl, null);
+ config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+ config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
- StateBackend backend2 = AbstractStateBackend
- .loadStateBackendFromConfigOrCreateDefault(config2, cl, null);
+ StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+ StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertTrue(backend1 instanceof FsStateBackend);
assertTrue(backend2 instanceof FsStateBackend);
@@ -110,13 +253,55 @@ public class StateBackendLoadingTest {
FsStateBackend fs1 = (FsStateBackend) backend1;
FsStateBackend fs2 = (FsStateBackend) backend2;
- assertEquals(expectedPath, fs1.getBasePath());
- assertEquals(expectedPath, fs2.getBasePath());
+ assertEquals(expectedCheckpointsPath, fs1.getCheckpointPath());
+ assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
+ assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
+ assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
assertEquals(threshold, fs1.getMinFileSizeThreshold());
assertEquals(threshold, fs2.getMinFileSizeThreshold());
+ assertEquals(async, fs1.isUsingAsynchronousSnapshots());
+ assertEquals(async, fs2.isUsingAsynchronousSnapshots());
}
/**
+ * Validates taking the application-defined file system state backend and adding with additional
+ * parameters from the cluster configuration, but giving precedence to application-defined
+ * parameters over configuration-defined parameters.
+ */
+ @Test
+ public void testLoadFileSystemStateBackendMixed() throws Exception {
+ final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+ final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+
+ final Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir));
+ final Path expectedSavepointsPath = new Path(savepointDir);
+
+ final int threshold = 1000000;
+
+ final FsStateBackend backend = new FsStateBackend(new URI(appCheckpointDir), threshold);
+
+ final Configuration config = new Configuration();
+ config.setString(backendKey, "jobmanager"); // this should not be picked up
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+ config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
+
+ final StateBackend loadedBackend =
+ StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+ assertTrue(loadedBackend instanceof FsStateBackend);
+
+ final FsStateBackend fs = (FsStateBackend) loadedBackend;
+ assertEquals(expectedCheckpointsPath, fs.getCheckpointPath());
+ assertEquals(expectedSavepointsPath, fs.getSavepointPath());
+ assertEquals(threshold, fs.getMinFileSizeThreshold());
+ }
+
+ // ------------------------------------------------------------------------
+ // Failures
+ // ------------------------------------------------------------------------
+
+ /**
* This test makes sure that failures properly manifest when the state backend could not be loaded.
*/
@Test
@@ -126,7 +311,7 @@ public class StateBackendLoadingTest {
// try a value that is neither recognized as a name, nor corresponds to a class
config.setString(backendKey, "does.not.exist");
try {
- AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
fail("should fail with an exception");
} catch (DynamicCodeLoadingException ignored) {
// expected
@@ -135,7 +320,7 @@ public class StateBackendLoadingTest {
// try a class that is not a factory
config.setString(backendKey, java.io.File.class.getName());
try {
- AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
fail("should fail with an exception");
} catch (DynamicCodeLoadingException ignored) {
// expected
@@ -144,7 +329,7 @@ public class StateBackendLoadingTest {
// a factory that fails
config.setString(backendKey, FailingFactory.class.getName());
try {
- AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+ StateBackendLoader.fromApplicationOrConfigOrDefault(null, config, cl, null);
fail("should fail with an exception");
} catch (IOException ignored) {
// expected
@@ -152,12 +337,103 @@ public class StateBackendLoadingTest {
}
// ------------------------------------------------------------------------
+ // High-availability default
+ // ------------------------------------------------------------------------
+
+ /**
+ * This tests that in the case of configured high-availability, the memory state backend
+ * automatically grabs the HA persistence directory.
+ */
+ @Test
+ public void testHighAvailabilityDefaultFallback() throws Exception {
+ final String haPersistenceDir = new Path(tmp.newFolder().toURI()).toString();
+ final Path expectedCheckpointPath = new Path(haPersistenceDir);
+
+ final Configuration config1 = new Configuration();
+ config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+ config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey, "jobmanager");
+ config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+ config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+ final MemoryStateBackend appBackend = new MemoryStateBackend();
+
+ final StateBackend loaded1 = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, null);
+ final StateBackend loaded2 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
+ final StateBackend loaded3 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
+
+ assertTrue(loaded1 instanceof MemoryStateBackend);
+ assertTrue(loaded2 instanceof MemoryStateBackend);
+ assertTrue(loaded3 instanceof MemoryStateBackend);
+
+ final MemoryStateBackend memBackend1 = (MemoryStateBackend) loaded1;
+ final MemoryStateBackend memBackend2 = (MemoryStateBackend) loaded2;
+ final MemoryStateBackend memBackend3 = (MemoryStateBackend) loaded3;
+
+ assertNotNull(memBackend1.getCheckpointPath());
+ assertNotNull(memBackend2.getCheckpointPath());
+ assertNotNull(memBackend3.getCheckpointPath());
+ assertNull(memBackend1.getSavepointPath());
+ assertNull(memBackend2.getSavepointPath());
+ assertNull(memBackend3.getSavepointPath());
+
+ assertEquals(expectedCheckpointPath, memBackend1.getCheckpointPath().getParent());
+ assertEquals(expectedCheckpointPath, memBackend2.getCheckpointPath().getParent());
+ assertEquals(expectedCheckpointPath, memBackend3.getCheckpointPath().getParent());
+ }
+
+ @Test
+ public void testHighAvailabilityDefaultFallbackLocalPaths() throws Exception {
+ final String haPersistenceDir = new Path(tmp.newFolder().getAbsolutePath()).toString();
+ final Path expectedCheckpointPath = new Path(haPersistenceDir).makeQualified(FileSystem.getLocalFileSystem());
+
+ final Configuration config1 = new Configuration();
+ config1.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config1.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+ config1.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+ final Configuration config2 = new Configuration();
+ config2.setString(backendKey, "jobmanager");
+ config2.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config2.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
+ config2.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haPersistenceDir);
+
+ final MemoryStateBackend appBackend = new MemoryStateBackend();
+
+ final StateBackend loaded1 = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config1, cl, null);
+ final StateBackend loaded2 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config1, cl, null);
+ final StateBackend loaded3 = StateBackendLoader.fromApplicationOrConfigOrDefault(null, config2, cl, null);
+
+ assertTrue(loaded1 instanceof MemoryStateBackend);
+ assertTrue(loaded2 instanceof MemoryStateBackend);
+ assertTrue(loaded3 instanceof MemoryStateBackend);
+
+ final MemoryStateBackend memBackend1 = (MemoryStateBackend) loaded1;
+ final MemoryStateBackend memBackend2 = (MemoryStateBackend) loaded2;
+ final MemoryStateBackend memBackend3 = (MemoryStateBackend) loaded3;
+
+ assertNotNull(memBackend1.getCheckpointPath());
+ assertNotNull(memBackend2.getCheckpointPath());
+ assertNotNull(memBackend3.getCheckpointPath());
+ assertNull(memBackend1.getSavepointPath());
+ assertNull(memBackend2.getSavepointPath());
+ assertNull(memBackend3.getSavepointPath());
+
+ assertEquals(expectedCheckpointPath, memBackend1.getCheckpointPath().getParent());
+ assertEquals(expectedCheckpointPath, memBackend2.getCheckpointPath().getParent());
+ assertEquals(expectedCheckpointPath, memBackend3.getCheckpointPath().getParent());
+ }
+
+ // ------------------------------------------------------------------------
static final class FailingFactory implements StateBackendFactory<StateBackend> {
- private static final long serialVersionUID = 1L;
@Override
- public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException {
+ public StateBackend createFromConfig(Configuration config) throws IOException {
throw new IOException("fail!");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 63563f3..ccf6baf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -442,7 +442,7 @@ public class StreamConfig implements Serializable {
// State backend
// ------------------------------------------------------------------------
- public void setStateBackend(AbstractStateBackend backend) {
+ public void setStateBackend(StateBackend backend) {
if (backend != null) {
try {
InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
@@ -452,7 +452,7 @@ public class StreamConfig implements Serializable {
}
}
- public AbstractStateBackend getStateBackend(ClassLoader cl) {
+ public StateBackend getStateBackend(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a1bd355..f89b916 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -36,13 +36,13 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -734,19 +734,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
private StateBackend createStateBackend() throws Exception {
- final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
+ final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
- if (fromJob != null) {
- // backend has been configured on the environment
- LOG.info("Using user-defined state backend: {}.", fromJob);
- return fromJob;
- }
- else {
- return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
- getEnvironment().getTaskManagerInfo().getConfiguration(),
- getUserCodeClassLoader(),
- LOG);
- }
+ return StateBackendLoader.fromApplicationOrConfigOrDefault(
+ fromApplication,
+ getEnvironment().getTaskManagerInfo().getConfiguration(),
+ getUserCodeClassLoader(),
+ LOG);
}
public OperatorStateBackend createOperatorStateBackend(
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index bc69e1c..2051771 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -291,6 +291,8 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
private static class SyncFailureInducingStateBackend extends MemoryStateBackend {
+ private static final long serialVersionUID = -1915780414440060539L;
+
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
@@ -308,10 +310,18 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
}
};
}
+
+ @Override
+ public SyncFailureInducingStateBackend configure(Configuration config) {
+ // retain this instance, no re-configuration!
+ return this;
+ }
}
private static class AsyncFailureInducingStateBackend extends MemoryStateBackend {
+ private static final long serialVersionUID = -7613628662587098470L;
+
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
@@ -334,6 +344,12 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
}
};
}
+
+ @Override
+ public AsyncFailureInducingStateBackend configure(Configuration config) {
+ // retain this instance, no re-configuration!
+ return this;
+ }
}
// ------------------------------------------------------------------------
@@ -356,6 +372,12 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
new ExecutionConfig(),
true);
}
+
+ @Override
+ public LockingStreamStateBackend configure(Configuration config) {
+ // retain this instance, no re-configuration!
+ return this;
+ }
}
private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
@@ -373,7 +395,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
@Override
public StreamStateHandle closeAndGetHandle() throws IOException {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fa03e78d/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 99a9d1f..ffe220e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -32,15 +32,15 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -70,7 +70,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
-import static org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -85,9 +84,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
@Parameterized.Parameters(name = "StateBackend: {0}")
public static Collection<String> parameters () {
return Arrays.asList(
- AbstractStateBackend.MEMORY_STATE_BACKEND_NAME,
- AbstractStateBackend.FS_STATE_BACKEND_NAME,
- AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME);
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ StateBackendLoader.FS_STATE_BACKEND_NAME,
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME);
}
@ClassRule
@@ -97,9 +96,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
Configuration config = new Configuration();
- config.setString(CoreOptions.STATE_BACKEND, backendType);
- config.setString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, temporaryFolder.newFolder().toURI().toString());
- stateBackend = AbstractStateBackend.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
+ config.setString(CheckpointingOptions.STATE_BACKEND, backendType);
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toURI().toString());
+ stateBackend = StateBackendLoader.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
}
private static final String POJO_NAME = "Pojo";