You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:49 UTC

[8/8] flink git commit: [FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.

[FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19

Branch: refs/heads/master
Commit: 24408e19037c8761924ca66a557dfdd8236a7be4
Parents: b46f5e0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 11:17:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +-
 .../apache/flink/configuration/CoreOptions.java | 21 +++--
 .../executiongraph/ExecutionGraphBuilder.java   | 11 ++-
 .../ExecutionGraphDeploymentTest.java           | 80 +++++++++-----------
 4 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 048e012..c835882 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,7 +182,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
 
-- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
 
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 1e40569..8cb4123 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public class CoreOptions {
 
-	/**
-	 * 
-	 */
+	// ------------------------------------------------------------------------
+	//  process parameters
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
 		.key("env.java.opts")
 		.defaultValue("");
@@ -38,16 +39,24 @@ public class CoreOptions {
 		.key("env.java.opts.taskmanager")
 		.defaultValue("");
 
+	// ------------------------------------------------------------------------
+	//  program
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
 		.key("parallelism.default")
 		.defaultValue(-1);
-	
+
+	// ------------------------------------------------------------------------
+	//  checkpoints / fault tolerance
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 		.key("state.backend")
 		.noDefaultValue();
 
 	/** The maximum number of completed checkpoint instances to retain.*/
-	public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
-		.key("state.checkpoints.max-retained-checkpoints")
+	public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
+		.key("state.checkpoints.num-retained")
 		.defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8a35773..8471178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -178,12 +178,17 @@ public class ExecutionGraphBuilder {
 			CheckpointIDCounter checkpointIdCounter;
 			try {
 				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
-					CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+					CoreOptions.MAX_RETAINED_CHECKPOINTS);
+
 				if (maxNumberOfCheckpointsToRetain <= 0) {
 					// warning and use 1 as the default value if the setting in
 					// state.checkpoints.max-retained-checkpoints is not greater than 0.
-					log.warn("The setting for max-retained-checkpoints is not a positive number.");
-					maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+					log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
+							CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+							maxNumberOfCheckpointsToRetain,
+							CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+					maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
 				}
 
 				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 57b549b..7f5811a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	// ------------------------------------------------------------------------
+	//  retained checkpoints config test
+	// ------------------------------------------------------------------------
+
 	@Test
-	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
-		try {
-			final Configuration jobManagerConfig = new Configuration();
+	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
+		final Configuration jobManagerConfig = new Configuration();
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
 
-			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
 				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
-	public void testSettingMaxNumberOfCheckpointsToRetain() {
-		try {
-			final int maxNumberOfCheckpointsToRetain = 10;
-			final Configuration jobManagerConfig = new Configuration();
-			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-				maxNumberOfCheckpointsToRetain);
+	public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final int maxNumberOfCheckpointsToRetain = 10;
+		final Configuration jobManagerConfig = new Configuration();
+		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+			maxNumberOfCheckpointsToRetain);
 
-			assertEquals(maxNumberOfCheckpointsToRetain,
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+		assertEquals(maxNumberOfCheckpointsToRetain,
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
 	@Test
-	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
-		try {
-			final int negativeMaxNumberOfCheckpointsToRetain = -10;
+	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
 
-			final Configuration jobManagerConfig = new Configuration();
-			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-				negativeMaxNumberOfCheckpointsToRetain);
+		final int negativeMaxNumberOfCheckpointsToRetain = -10;
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final Configuration jobManagerConfig = new Configuration();
+		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+			negativeMaxNumberOfCheckpointsToRetain);
 
-			assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+		assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
 	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
@@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest {
 	}
 
 	private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
-		final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+		final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
 
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test");
 		jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
-			new ArrayList<JobVertexID>(1),
-			new ArrayList<JobVertexID>(1),
-			new ArrayList<JobVertexID>(1),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
 			100,
 			10 * 60 * 1000,
 			0,
@@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest {
 			new ProgrammedSlotProvider(1),
 			getClass().getClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
-			Time.minutes(10),
+			Time.seconds(10),
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,