You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:48 UTC

[7/8] flink git commit: [FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable

[FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable

This closes #3374


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b46f5e05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b46f5e05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b46f5e05

Branch: refs/heads/master
Commit: b46f5e050bdd77fe6e501bad20466d8777218131
Parents: 6b5e1f6
Author: Tony Wei <to...@gmail.com>
Authored: Mon Feb 20 18:30:24 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../apache/flink/configuration/CoreOptions.java |   5 +
 .../checkpoint/CheckpointRecoveryFactory.java   |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +
 .../StandaloneCheckpointRecoveryFactory.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   5 +
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   5 +
 .../executiongraph/ExecutionGraphBuilder.java   |  12 ++-
 .../CheckpointCoordinatorFailureTest.java       |   5 +
 .../ExecutionGraphDeploymentTest.java           | 101 +++++++++++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |   7 +-
 12 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 013e56a..048e012 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,6 +182,8 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
 
+- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4e30ceb..1e40569 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -45,4 +45,9 @@ public class CoreOptions {
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 		.key("state.backend")
 		.noDefaultValue();
+
+	/** The maximum number of completed checkpoint instances to retain.*/
+	public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
+		.key("state.checkpoints.max-retained-checkpoints")
+		.defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 0c7dfa7..3fb1385 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -44,10 +44,11 @@ public interface CheckpointRecoveryFactory {
 	 * Creates a {@link CompletedCheckpointStore} instance for a job.
 	 *
 	 * @param jobId           Job ID to recover checkpoints for
+	 * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
 	 * @param userClassLoader User code class loader of the job
 	 * @return {@link CompletedCheckpointStore} instance for the job
 	 */
-	CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index e91e038..9c2b199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -73,6 +73,11 @@ public interface CompletedCheckpointStore {
 	int getNumberOfRetainedCheckpoints();
 
 	/**
+	 * Returns the max number of retained checkpoints.
+	 */
+	int getMaxNumberOfRetainedCheckpoints();
+
+	/**
 	 * This method returns whether the completed checkpoint store requires checkpoints to be
 	 * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
 	 * store can exploit (for example by simply pointing the persisted metadata).

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 57785ce..2d2cc2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -37,11 +37,10 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
 	}
 
 	@Override
-	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception {
 
-		return new StandaloneCompletedCheckpointStore(
-				CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index a0248b2..6c752f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -84,6 +84,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
+	public int getMaxNumberOfRetainedCheckpoints() {
+		return maxNumberOfCheckpointsToRetain;
+	}
+
+	@Override
 	public void shutdown(JobStatus jobStatus) throws Exception {
 		try {
 			LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 09bfa8c..481559b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -59,11 +59,11 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 	}
 
 	@Override
-	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor);
+				maxNumberOfCheckpointsToRetain, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 7a167cb..1319c27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -252,6 +252,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	}
 
 	@Override
+	public int getMaxNumberOfRetainedCheckpoints() {
+		return maxNumberOfCheckpointsToRetain;
+	}
+
+	@Override
 	public void shutdown(JobStatus jobStatus) throws Exception {
 		if (jobStatus.isGloballyTerminalState()) {
 			LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c..8a35773 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -176,7 +177,16 @@ public class ExecutionGraphBuilder {
 			CompletedCheckpointStore completedCheckpoints;
 			CheckpointIDCounter checkpointIdCounter;
 			try {
-				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader);
+				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
+					CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+				if (maxNumberOfCheckpointsToRetain <= 0) {
+					// warning and use 1 as the default value if the setting in
+					// state.checkpoints.max-retained-checkpoints is not greater than 0.
+					log.warn("The setting for max-retained-checkpoints is not a positive number.");
+					maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+				}
+
+				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
 				checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 9517257..340e2a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -136,6 +136,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		}
 
 		@Override
+		public int getMaxNumberOfRetainedCheckpoints() {
+			return 1;
+		}
+
+		@Override
 		public boolean requiresExternalizedCheckpoints() {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 30824e0..57b549b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,14 +33,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,8 +58,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 public class ExecutionGraphDeploymentTest {
 
@@ -435,6 +446,63 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	@Test
+	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
+		try {
+			final Configuration jobManagerConfig = new Configuration();
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSettingMaxNumberOfCheckpointsToRetain() {
+		try {
+			final int maxNumberOfCheckpointsToRetain = 10;
+			final Configuration jobManagerConfig = new Configuration();
+			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+				maxNumberOfCheckpointsToRetain);
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertEquals(maxNumberOfCheckpointsToRetain,
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
+		try {
+			final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+			final Configuration jobManagerConfig = new Configuration();
+			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+				negativeMaxNumberOfCheckpointsToRetain);
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 
@@ -497,4 +565,37 @@ public class ExecutionGraphDeploymentTest {
 			throw new Exception();
 		}
 	}
+
+	private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
+		final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test");
+		jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+			new ArrayList<JobVertexID>(1),
+			new ArrayList<JobVertexID>(1),
+			new ArrayList<JobVertexID>(1),
+			100,
+			10 * 60 * 1000,
+			0,
+			1,
+			ExternalizedCheckpointSettings.none(),
+			null,
+			false));
+
+		return ExecutionGraphBuilder.buildGraph(
+			null,
+			jobGraph,
+			configuration,
+			executor,
+			executor,
+			new ProgrammedSlotProvider(1),
+			getClass().getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.minutes(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			1,
+			LoggerFactory.getLogger(getClass()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 115b06c..32358c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -485,6 +485,11 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public int getMaxNumberOfRetainedCheckpoints() {
+			return 1;
+		}
+
+		@Override
 		public boolean requiresExternalizedCheckpoints() {
 			return false;
 		}
@@ -509,7 +514,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
+		public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
 			return store;
 		}