You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:49 UTC
[8/8] flink git commit: [FLINK-4754] [checkpoints] Small followups to
the configuration of number of retained checkpoints.
[FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19
Branch: refs/heads/master
Commit: 24408e19037c8761924ca66a557dfdd8236a7be4
Parents: b46f5e0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 11:17:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 +-
.../apache/flink/configuration/CoreOptions.java | 21 +++--
.../executiongraph/ExecutionGraphBuilder.java | 11 ++-
.../ExecutionGraphDeploymentTest.java | 80 +++++++++-----------
4 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 048e012..c835882 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,7 +182,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
-- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 1e40569..8cb4123 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public class CoreOptions {
- /**
- *
- */
+ // ------------------------------------------------------------------------
+ // process parameters
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
.key("env.java.opts")
.defaultValue("");
@@ -38,16 +39,24 @@ public class CoreOptions {
.key("env.java.opts.taskmanager")
.defaultValue("");
+ // ------------------------------------------------------------------------
+ // program
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
.key("parallelism.default")
.defaultValue(-1);
-
+
+ // ------------------------------------------------------------------------
+ // checkpoints / fault tolerance
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
.key("state.backend")
.noDefaultValue();
/** The maximum number of completed checkpoint instances to retain.*/
- public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
- .key("state.checkpoints.max-retained-checkpoints")
+ public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
+ .key("state.checkpoints.num-retained")
.defaultValue(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8a35773..8471178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -178,12 +178,17 @@ public class ExecutionGraphBuilder {
CheckpointIDCounter checkpointIdCounter;
try {
int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
- CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+ CoreOptions.MAX_RETAINED_CHECKPOINTS);
+
if (maxNumberOfCheckpointsToRetain <= 0) {
// warning and use 1 as the default value if the setting in
// state.checkpoints.max-retained-checkpoints is not greater than 0.
- log.warn("The setting for max-retained-checkpoints is not a positive number.");
- maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+ log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
+ CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+ maxNumberOfCheckpointsToRetain,
+ CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+ maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
}
completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 57b549b..7f5811a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
@@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest {
assertEquals(JobStatus.FAILED, eg.getState());
}
+ // ------------------------------------------------------------------------
+ // retained checkpoints config test
+ // ------------------------------------------------------------------------
+
@Test
- public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
- try {
- final Configuration jobManagerConfig = new Configuration();
+ public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
+ final Configuration jobManagerConfig = new Configuration();
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
- assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+ assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
}
@Test
- public void testSettingMaxNumberOfCheckpointsToRetain() {
- try {
- final int maxNumberOfCheckpointsToRetain = 10;
- final Configuration jobManagerConfig = new Configuration();
- jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
- maxNumberOfCheckpointsToRetain);
+ public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final int maxNumberOfCheckpointsToRetain = 10;
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+ maxNumberOfCheckpointsToRetain);
- assertEquals(maxNumberOfCheckpointsToRetain,
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertEquals(maxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
}
@Test
- public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
- try {
- final int negativeMaxNumberOfCheckpointsToRetain = -10;
+ public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
- final Configuration jobManagerConfig = new Configuration();
- jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
- negativeMaxNumberOfCheckpointsToRetain);
+ final int negativeMaxNumberOfCheckpointsToRetain = -10;
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+ negativeMaxNumberOfCheckpointsToRetain);
- assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+ assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
}
private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
@@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest {
}
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
- final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
- new ArrayList<JobVertexID>(1),
- new ArrayList<JobVertexID>(1),
- new ArrayList<JobVertexID>(1),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
100,
10 * 60 * 1000,
0,
@@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest {
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
- Time.minutes(10),
+ Time.seconds(10),
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
1,