You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:01:18 UTC

[5/9] flink git commit: [FLINK-8960][tests] Port SavepointITCase to flip6

[FLINK-8960][tests] Port SavepointITCase to flip6

This closes #5806.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a82d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a82d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a82d5f

Branch: refs/heads/release-1.5
Commit: f1a82d5fb333af12f2ac9005e34bb7abfa1fcd66
Parents: 390db6f
Author: zentol <ch...@apache.org>
Authored: Tue Mar 27 14:45:03 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 510 +++++++------------
 1 file changed, 170 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1a82d5f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 888c418..9549dc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -26,41 +26,20 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -69,15 +48,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.HashMultimap;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -87,27 +62,18 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.ArrayList;
+import java.net.URI;
+import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -142,264 +108,108 @@ public class SavepointITCase extends TestLogger {
 		final int numTaskManagers = 2;
 		final int numSlotsPerTaskManager = 2;
 		final int parallelism = numTaskManagers * numSlotsPerTaskManager;
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-		final File testRoot = folder.getRoot();
-
-		TestingCluster flink = null;
-
-		try {
-			// Create a test actor system
-			ActorSystem testActorSystem = AkkaUtils.createDefaultActorSystem();
-
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
-			final File checkpointDir = new File(testRoot, "checkpoints");
-			final File savepointRootDir = new File(testRoot, "savepoints");
+		final File testRoot = folder.newFolder();
 
-			if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
-				fail("Test setup failed: failed to create temporary directories.");
-			}
-
-			// Use file based checkpoints
-			config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
-			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
-			config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
-			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
-
-			// Start Flink
-			flink = new TestingCluster(config);
-			flink.start(true);
-
-			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
-			final JobID jobId = jobGraph.getJobID();
-
-			// Reset the static test job helpers
-			StatefulCounter.resetForTest(parallelism);
+		Configuration config = new Configuration();
 
-			// Retrieve the job manager
-			ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+		final File checkpointDir = new File(testRoot, "checkpoints");
+		final File savepointRootDir = new File(testRoot, "savepoints");
 
-			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
+		if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
+			fail("Test setup failed: failed to create temporary directories.");
+		}
 
-			flink.submitJobDetached(jobGraph);
+		// Use file based checkpoints
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
 
-			LOG.info("Waiting for some progress.");
+		MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
 
-			// wait for the JobManager to be ready
-			Future<Object> allRunning = jobManager.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
-			Await.ready(allRunning, deadline.timeLeft());
+		String savepointPath = submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
 
-			// wait for the Tasks to be ready
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
+	}
 
-			LOG.info("Triggering a savepoint.");
-			Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-			final String savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+	private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+		final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+		final JobID jobId = jobGraph.getJobID();
+		StatefulCounter.resetForTest(parallelism);
 
-			// Retrieve the savepoint from the testing job manager
-			LOG.info("Requesting the savepoint.");
-			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
+		MiniClusterResource cluster = clusterFactory.get();
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
-			SavepointV2 savepoint = (SavepointV2) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
-			LOG.info("Retrieved savepoint: " + savepointPath + ".");
+		try {
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 
-			// Shut down the Flink cluster (thereby canceling the job)
-			LOG.info("Shutting down Flink cluster.");
-			flink.stop();
-			flink = null;
+			StatefulCounter.getProgressLatch().await();
 
-			// - Verification START -------------------------------------------
+			String savepointPath = client.triggerSavepoint(jobId, null).get();
 
 			// Only one savepoint should exist
-			File[] files = savepointRootDir.listFiles();
-
-			if (files != null) {
-				assertEquals("Savepoint not created in expected directory", 1, files.length);
-				assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+			File savepointDir = new File(new URI(savepointPath));
+			assertTrue("Savepoint directory does not exist.", savepointDir.exists());
+			assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory());
 
-				File savepointDir = files[0];
-				File[] savepointFiles = savepointDir.listFiles();
-				assertNotNull(savepointFiles);
+			File[] savepointFiles = savepointDir.listFiles();
 
+			if (savepointFiles != null) {
 				// Expect one metadata file and one checkpoint file per stateful
 				// parallel subtask
 				String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: "
 					+ Arrays.toString(savepointFiles);
 				assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
 			} else {
-				fail("Savepoint not created in expected directory");
+				fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath));
 			}
 
-			// - Verification END ---------------------------------------------
-
-			// Restart the cluster
-			LOG.info("Restarting Flink cluster.");
-			flink = new TestingCluster(config);
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
-			// Reset static test helpers
+			return savepointPath;
+		} finally {
+			cluster.after();
 			StatefulCounter.resetForTest(parallelism);
+		}
+	}
 
-			// Gather all task deployment descriptors
-			final Throwable[] error = new Throwable[1];
-			final TestingCluster finalFlink = flink;
-			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
-
-			new JavaTestKit(testActorSystem) {{
-
-				new Within(deadline.timeLeft()) {
-					@Override
-					protected void run() {
-						try {
-							// Register to all submit task messages for job
-							for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
-								taskManager.tell(new TestingTaskManagerMessages
-									.RegisterSubmitTaskListener(jobId), getTestActor());
-							}
-
-							// Set the savepoint path
-							jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
-
-							LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
-								"savepoint path " + savepointPath + " in detached mode.");
-
-							// Submit the job
-							finalFlink.submitJobDetached(jobGraph);
-
-							int numTasks = 0;
-							for (JobVertex jobVertex : jobGraph.getVertices()) {
-								numTasks += jobVertex.getParallelism();
-							}
-
-							// Gather the task deployment descriptors
-							LOG.info("Gathering " + numTasks + " submitted " +
-								"TaskDeploymentDescriptor instances.");
-
-							for (int i = 0; i < numTasks; i++) {
-								ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
-									expectMsgAnyClassOf(getRemainingTime(),
-										ResponseSubmitTaskListener.class);
-
-								TaskDeploymentDescriptor tdd = resp.tdd();
-
-								LOG.info("Received: " + tdd.toString() + ".");
-
-								TaskInformation taskInformation = tdd
-									.getSerializedTaskInformation()
-									.deserializeValue(getClass().getClassLoader());
-
-								tdds.put(taskInformation.getJobVertexId(), tdd);
-							}
-						} catch (Throwable t) {
-							error[0] = t;
-						}
-					}
-				};
-			}};
-
-			ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
-
-			// - Verification START -------------------------------------------
-
-			String errMsg = "Error during gathering of TaskDeploymentDescriptors";
-			if (error[0] != null) {
-				throw new RuntimeException(error[0]);
-			}
-
-			Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
-			for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
-				List<OperatorID> operatorIDs = task.getOperatorIDs();
-				for (int x = 0; x < operatorIDs.size(); x++) {
-					operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
-				}
-			}
-
-			// Verify that all tasks, which are part of the savepoint
-			// have a matching task deployment descriptor.
-			for (OperatorState operatorState : savepoint.getOperatorStates()) {
-				Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-				Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
-
-				errMsg = "Missing task for savepoint state for operator "
-					+ operatorState.getOperatorID() + ".";
-				assertTrue(errMsg, taskTdds.size() > 0);
-
-				assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
+	private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+		final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+		final JobID jobId = jobGraph.getJobID();
+		StatefulCounter.resetForTest(parallelism);
 
-				for (TaskDeploymentDescriptor tdd : taskTdds) {
-					OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
+		MiniClusterResource cluster = clusterFactory.get();
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
-					assertNotNull(subtaskState);
-				}
-			}
+		try {
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 
 			// Await state is restored
-			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			StatefulCounter.getRestoreLatch().await();
 
 			// Await some progress after restore
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			// - Verification END ---------------------------------------------
+			StatefulCounter.getProgressLatch().await();
 
-			LOG.info("Cancelling job " + jobId + ".");
-			jobManager.tell(new CancelJob(jobId));
+			client.cancel(jobId);
 
-			LOG.info("Disposing savepoint " + savepointPath + ".");
-			Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> client.getJobStatus(jobId),
+				Time.milliseconds(50),
+				Deadline.now().plus(Duration.ofSeconds(30)),
+				status -> status == JobStatus.CANCELED,
+				TestingUtils.defaultScheduledExecutor()
+			);
 
-			errMsg = "Failed to dispose savepoint " + savepointPath + ".";
-			Object resp = Await.result(disposeFuture, deadline.timeLeft());
-			assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
+			client.disposeSavepoint(savepointPath)
+				.get();
 
-			// - Verification START -------------------------------------------
-			// The checkpoint files
-			List<File> checkpointFiles = new ArrayList<>();
-
-			for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
-				for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState();
-
-					if (streamTaskState != null && !streamTaskState.isEmpty()) {
-						for (OperatorStateHandle osh : streamTaskState) {
-							FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
-							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
-						}
-					}
-				}
-			}
-
-			// The checkpoint files of the savepoint should have been discarded
-			for (File f : checkpointFiles) {
-				errMsg = "Checkpoint file " + f + " not cleaned up properly.";
-				assertFalse(errMsg, f.exists());
-			}
-
-			if (checkpointFiles.size() > 0) {
-				File parent = checkpointFiles.get(0).getParentFile();
-				errMsg = "Checkpoint parent directory " + parent + " not cleaned up properly.";
-				assertFalse(errMsg, parent.exists());
-			}
-
-			// All savepoints should have been cleaned up
-			errMsg = "Savepoints directory not cleaned up properly: " +
-				Arrays.toString(savepointRootDir.listFiles()) + ".";
-			assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
-
-			// - Verification END ---------------------------------------------
+			assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
 		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
+			cluster.after();
+			StatefulCounter.resetForTest(parallelism);
 		}
 	}
 
@@ -410,34 +220,23 @@ public class SavepointITCase extends TestLogger {
 		int numSlotsPerTaskManager = 1;
 		int parallelism = numTaskManagers * numSlotsPerTaskManager;
 
-		// Test deadline
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
-		final File tmpDir = folder.getRoot();
+		final File tmpDir = folder.newFolder();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
-		TestingCluster flink = null;
+		final Configuration config = new Configuration();
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
-			LOG.info("Flink configuration: " + config + ".");
-
-			// Start Flink
-			flink = new TestingCluster(config);
-			LOG.info("Starting Flink cluster.");
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(
-				flink.leaderGateway().future(),
-				deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
 
 			// High value to ensure timeouts if restarted.
 			int numberOfRetries = 1000;
@@ -453,15 +252,17 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			try {
-				flink.submitJobAndWait(jobGraph, false);
+				client.setDetached(false);
+				client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			} catch (Exception e) {
-				assertEquals(JobExecutionException.class, e.getClass());
-				assertEquals(FileNotFoundException.class, e.getCause().getClass());
+				Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
+				Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
+				if (!(expectedJobExecutionException.isPresent() && expectedFileNotFoundException.isPresent())) {
+					throw e;
+				}
 			}
 		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
+			cluster.after();
 		}
 	}
 
@@ -480,15 +281,13 @@ public class SavepointITCase extends TestLogger {
 		int parallelism = 2;
 
 		// Test deadline
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+		final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5));
 
-		final File tmpDir = folder.getRoot();
+		final File tmpDir = folder.newFolder();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
 		// Flink configuration
 		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		String savepointPath;
@@ -496,18 +295,18 @@ public class SavepointITCase extends TestLogger {
 		LOG.info("Flink configuration: " + config + ".");
 
 		// Start Flink
-		TestingCluster flink = new TestingCluster(config);
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+
+		LOG.info("Shutting down Flink cluster.");
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 		try {
-			LOG.info("Starting Flink cluster.");
-			flink.start(true);
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
 			final StatefulCounter statefulCounter = new StatefulCounter();
 			StatefulCounter.resetForTest(parallelism);
 
@@ -536,38 +335,34 @@ public class SavepointITCase extends TestLogger {
 
 			JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
 
-			JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph);
+			client.setDetached(true);
+			JobSubmissionResult submissionResult = client.submitJob(originalJobGraph, SavepointITCase.class.getClassLoader());
 			JobID jobID = submissionResult.getJobID();
 
 			// wait for the Tasks to be ready
 			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-			savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
-
-			((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
+			savepointPath = client.triggerSavepoint(jobID, null).get();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 		} finally {
 			// Shut down the Flink cluster (thereby canceling the job)
 			LOG.info("Shutting down Flink cluster.");
-			flink.stop();
+			cluster.after();
 		}
 
 		// create a new TestingCluster to make sure we start with completely
 		// new resources
-		flink = new TestingCluster(config);
+		cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+		LOG.info("Restarting Flink cluster.");
+		cluster.before();
+		client = cluster.getClusterClient();
 		try {
-			LOG.info("Restarting Flink cluster.");
-			flink = new TestingCluster(config);
-
-			flink.start(true);
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
 			// Reset static test helpers
 			StatefulCounter.resetForTest(parallelism);
 
@@ -598,14 +393,15 @@ public class SavepointITCase extends TestLogger {
 					"savepoint path " + savepointPath + " in detached mode.");
 
 			// Submit the job
-			flink.submitJobDetached(modifiedJobGraph);
+			client.setDetached(true);
+			client.submitJob(modifiedJobGraph, SavepointITCase.class.getClassLoader());
 			// Await state is restored
 			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// Await some progress after restore
 			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		} finally {
-			flink.stop();
+			cluster.after();
 		}
 	}
 
@@ -787,7 +583,6 @@ public class SavepointITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.addAll(jobGraph.getJobConfiguration());
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
 		final File checkpointDir = new File(tmpDir, "checkpoints");
 		final File savepointDir = new File(tmpDir, "savepoints");
 
@@ -800,31 +595,40 @@ public class SavepointITCase extends TestLogger {
 		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-		TestingCluster cluster = new TestingCluster(config, false);
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				1,
+				2 * jobGraph.getMaximumParallelism()
+			),
+			true);
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
+
 		String savepointPath = null;
 		try {
-			cluster.start();
-
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			for (OneShotLatch latch : iterTestSnapshotWait) {
 				latch.await();
 			}
-			savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
+			savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
 			source.cancel();
 
 			jobGraph = streamGraph.getJobGraph();
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
 			source.cancel();
 		} finally {
 			if (null != savepointPath) {
-				cluster.disposeSavepoint(savepointPath);
+				client.disposeSavepoint(savepointPath);
 			}
-			cluster.stop();
+			cluster.after();
 		}
 	}
 
@@ -904,4 +708,30 @@ public class SavepointITCase extends TestLogger {
 			}
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	// Utilities
+	// ------------------------------------------------------------------------
+
+	private static class MiniClusterResourceFactory {
+		private final int numTaskManagers;
+		private final int numSlotsPerTaskManager;
+		private final Configuration config;
+
+		private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
+			this.numTaskManagers = numTaskManagers;
+			this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+			this.config = config;
+		}
+
+		MiniClusterResource get() {
+			return new MiniClusterResource(
+				new MiniClusterResource.MiniClusterResourceConfiguration(
+					config,
+					numTaskManagers,
+					numSlotsPerTaskManager
+				),
+				true);
+		}
+	}
 }