You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:01:18 UTC
[5/9] flink git commit: [FLINK-8960][tests] Port SavepointITCase to
flip6
[FLINK-8960][tests] Port SavepointITCase to flip6
This closes #5806.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a82d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a82d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a82d5f
Branch: refs/heads/release-1.5
Commit: f1a82d5fb333af12f2ac9005e34bb7abfa1fcd66
Parents: 390db6f
Author: zentol <ch...@apache.org>
Authored: Tue Mar 27 14:45:03 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 10:01:00 2018 +0200
----------------------------------------------------------------------
.../test/checkpointing/SavepointITCase.java | 510 +++++++------------
1 file changed, 170 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a82d5f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 888c418..9549dc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -26,41 +26,20 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -69,15 +48,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.shaded.guava18.com.google.common.collect.HashMultimap;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -87,27 +62,18 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.ArrayList;
+import java.net.URI;
+import java.time.Duration;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -142,264 +108,108 @@ public class SavepointITCase extends TestLogger {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- final File testRoot = folder.getRoot();
-
- TestingCluster flink = null;
-
- try {
- // Create a test actor system
- ActorSystem testActorSystem = AkkaUtils.createDefaultActorSystem();
-
- // Flink configuration
- final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
- final File checkpointDir = new File(testRoot, "checkpoints");
- final File savepointRootDir = new File(testRoot, "savepoints");
+ final File testRoot = folder.newFolder();
- if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
- fail("Test setup failed: failed to create temporary directories.");
- }
-
- // Use file based checkpoints
- config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
-
- // Start Flink
- flink = new TestingCluster(config);
- flink.start(true);
-
- // Submit the job
- final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
- final JobID jobId = jobGraph.getJobID();
-
- // Reset the static test job helpers
- StatefulCounter.resetForTest(parallelism);
+ Configuration config = new Configuration();
- // Retrieve the job manager
- ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+ final File checkpointDir = new File(testRoot, "checkpoints");
+ final File savepointRootDir = new File(testRoot, "savepoints");
- LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
+ if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
+ fail("Test setup failed: failed to create temporary directories.");
+ }
- flink.submitJobDetached(jobGraph);
+ // Use file based checkpoints
+ config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+ config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
- LOG.info("Waiting for some progress.");
+ MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
- // wait for the JobManager to be ready
- Future<Object> allRunning = jobManager.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
- Await.ready(allRunning, deadline.timeLeft());
+ String savepointPath = submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
- // wait for the Tasks to be ready
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
+ }
- LOG.info("Triggering a savepoint.");
- Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
- final String savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
- LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+ private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+ final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+ final JobID jobId = jobGraph.getJobID();
+ StatefulCounter.resetForTest(parallelism);
- // Retrieve the savepoint from the testing job manager
- LOG.info("Requesting the savepoint.");
- Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
+ MiniClusterResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
- SavepointV2 savepoint = (SavepointV2) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
- LOG.info("Retrieved savepoint: " + savepointPath + ".");
+ try {
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
- // Shut down the Flink cluster (thereby canceling the job)
- LOG.info("Shutting down Flink cluster.");
- flink.stop();
- flink = null;
+ StatefulCounter.getProgressLatch().await();
- // - Verification START -------------------------------------------
+ String savepointPath = client.triggerSavepoint(jobId, null).get();
// Only one savepoint should exist
- File[] files = savepointRootDir.listFiles();
-
- if (files != null) {
- assertEquals("Savepoint not created in expected directory", 1, files.length);
- assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+ File savepointDir = new File(new URI(savepointPath));
+ assertTrue("Savepoint directory does not exist.", savepointDir.exists());
+ assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory());
- File savepointDir = files[0];
- File[] savepointFiles = savepointDir.listFiles();
- assertNotNull(savepointFiles);
+ File[] savepointFiles = savepointDir.listFiles();
+ if (savepointFiles != null) {
// Expect one metadata file and one checkpoint file per stateful
// parallel subtask
String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: "
+ Arrays.toString(savepointFiles);
assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
} else {
- fail("Savepoint not created in expected directory");
+ fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath));
}
- // - Verification END ---------------------------------------------
-
- // Restart the cluster
- LOG.info("Restarting Flink cluster.");
- flink = new TestingCluster(config);
- flink.start();
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
- // Reset static test helpers
+ return savepointPath;
+ } finally {
+ cluster.after();
StatefulCounter.resetForTest(parallelism);
+ }
+ }
- // Gather all task deployment descriptors
- final Throwable[] error = new Throwable[1];
- final TestingCluster finalFlink = flink;
- final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
-
- new JavaTestKit(testActorSystem) {{
-
- new Within(deadline.timeLeft()) {
- @Override
- protected void run() {
- try {
- // Register to all submit task messages for job
- for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
- taskManager.tell(new TestingTaskManagerMessages
- .RegisterSubmitTaskListener(jobId), getTestActor());
- }
-
- // Set the savepoint path
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
-
- LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
- "savepoint path " + savepointPath + " in detached mode.");
-
- // Submit the job
- finalFlink.submitJobDetached(jobGraph);
-
- int numTasks = 0;
- for (JobVertex jobVertex : jobGraph.getVertices()) {
- numTasks += jobVertex.getParallelism();
- }
-
- // Gather the task deployment descriptors
- LOG.info("Gathering " + numTasks + " submitted " +
- "TaskDeploymentDescriptor instances.");
-
- for (int i = 0; i < numTasks; i++) {
- ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
- expectMsgAnyClassOf(getRemainingTime(),
- ResponseSubmitTaskListener.class);
-
- TaskDeploymentDescriptor tdd = resp.tdd();
-
- LOG.info("Received: " + tdd.toString() + ".");
-
- TaskInformation taskInformation = tdd
- .getSerializedTaskInformation()
- .deserializeValue(getClass().getClassLoader());
-
- tdds.put(taskInformation.getJobVertexId(), tdd);
- }
- } catch (Throwable t) {
- error[0] = t;
- }
- }
- };
- }};
-
- ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
-
- // - Verification START -------------------------------------------
-
- String errMsg = "Error during gathering of TaskDeploymentDescriptors";
- if (error[0] != null) {
- throw new RuntimeException(error[0]);
- }
-
- Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
- for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
- List<OperatorID> operatorIDs = task.getOperatorIDs();
- for (int x = 0; x < operatorIDs.size(); x++) {
- operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
- }
- }
-
- // Verify that all tasks, which are part of the savepoint
- // have a matching task deployment descriptor.
- for (OperatorState operatorState : savepoint.getOperatorStates()) {
- Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
- Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
-
- errMsg = "Missing task for savepoint state for operator "
- + operatorState.getOperatorID() + ".";
- assertTrue(errMsg, taskTdds.size() > 0);
-
- assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
+ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+ final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+ final JobID jobId = jobGraph.getJobID();
+ StatefulCounter.resetForTest(parallelism);
- for (TaskDeploymentDescriptor tdd : taskTdds) {
- OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
+ MiniClusterResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
- assertNotNull(subtaskState);
- }
- }
+ try {
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
// Await state is restored
- StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ StatefulCounter.getRestoreLatch().await();
// Await some progress after restore
- StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- // - Verification END ---------------------------------------------
+ StatefulCounter.getProgressLatch().await();
- LOG.info("Cancelling job " + jobId + ".");
- jobManager.tell(new CancelJob(jobId));
+ client.cancel(jobId);
- LOG.info("Disposing savepoint " + savepointPath + ".");
- Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> client.getJobStatus(jobId),
+ Time.milliseconds(50),
+ Deadline.now().plus(Duration.ofSeconds(30)),
+ status -> status == JobStatus.CANCELED,
+ TestingUtils.defaultScheduledExecutor()
+ );
- errMsg = "Failed to dispose savepoint " + savepointPath + ".";
- Object resp = Await.result(disposeFuture, deadline.timeLeft());
- assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
+ client.disposeSavepoint(savepointPath)
+ .get();
- // - Verification START -------------------------------------------
- // The checkpoint files
- List<File> checkpointFiles = new ArrayList<>();
-
- for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
- for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
- Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState();
-
- if (streamTaskState != null && !streamTaskState.isEmpty()) {
- for (OperatorStateHandle osh : streamTaskState) {
- FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
- checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
- }
- }
- }
- }
-
- // The checkpoint files of the savepoint should have been discarded
- for (File f : checkpointFiles) {
- errMsg = "Checkpoint file " + f + " not cleaned up properly.";
- assertFalse(errMsg, f.exists());
- }
-
- if (checkpointFiles.size() > 0) {
- File parent = checkpointFiles.get(0).getParentFile();
- errMsg = "Checkpoint parent directory " + parent + " not cleaned up properly.";
- assertFalse(errMsg, parent.exists());
- }
-
- // All savepoints should have been cleaned up
- errMsg = "Savepoints directory not cleaned up properly: " +
- Arrays.toString(savepointRootDir.listFiles()) + ".";
- assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
-
- // - Verification END ---------------------------------------------
+ assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
} finally {
- if (flink != null) {
- flink.stop();
- }
+ cluster.after();
+ StatefulCounter.resetForTest(parallelism);
}
}
@@ -410,34 +220,23 @@ public class SavepointITCase extends TestLogger {
int numSlotsPerTaskManager = 1;
int parallelism = numTaskManagers * numSlotsPerTaskManager;
- // Test deadline
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
- final File tmpDir = folder.getRoot();
+ final File tmpDir = folder.newFolder();
final File savepointDir = new File(tmpDir, "savepoints");
- TestingCluster flink = null;
+ final Configuration config = new Configuration();
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- // Flink configuration
- final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
- LOG.info("Flink configuration: " + config + ".");
-
- // Start Flink
- flink = new TestingCluster(config);
- LOG.info("Starting Flink cluster.");
- flink.start();
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
// High value to ensure timeouts if restarted.
int numberOfRetries = 1000;
@@ -453,15 +252,17 @@ public class SavepointITCase extends TestLogger {
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
try {
- flink.submitJobAndWait(jobGraph, false);
+ client.setDetached(false);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
} catch (Exception e) {
- assertEquals(JobExecutionException.class, e.getClass());
- assertEquals(FileNotFoundException.class, e.getCause().getClass());
+ Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
+ Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
+ if (!(expectedJobExecutionException.isPresent() && expectedFileNotFoundException.isPresent())) {
+ throw e;
+ }
}
} finally {
- if (flink != null) {
- flink.stop();
- }
+ cluster.after();
}
}
@@ -480,15 +281,13 @@ public class SavepointITCase extends TestLogger {
int parallelism = 2;
// Test deadline
- final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+ final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5));
- final File tmpDir = folder.getRoot();
+ final File tmpDir = folder.newFolder();
final File savepointDir = new File(tmpDir, "savepoints");
// Flink configuration
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
String savepointPath;
@@ -496,18 +295,18 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- TestingCluster flink = new TestingCluster(config);
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+
+ LOG.info("Shutting down Flink cluster.");
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- LOG.info("Starting Flink cluster.");
- flink.start(true);
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
final StatefulCounter statefulCounter = new StatefulCounter();
StatefulCounter.resetForTest(parallelism);
@@ -536,38 +335,34 @@ public class SavepointITCase extends TestLogger {
JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
- JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph);
+ client.setDetached(true);
+ JobSubmissionResult submissionResult = client.submitJob(originalJobGraph, SavepointITCase.class.getClassLoader());
JobID jobID = submissionResult.getJobID();
// wait for the Tasks to be ready
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
- savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
- Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
-
- ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
+ savepointPath = client.triggerSavepoint(jobID, null).get();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
} finally {
// Shut down the Flink cluster (thereby canceling the job)
LOG.info("Shutting down Flink cluster.");
- flink.stop();
+ cluster.after();
}
// create a new TestingCluster to make sure we start with completely
// new resources
- flink = new TestingCluster(config);
+ cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ LOG.info("Restarting Flink cluster.");
+ cluster.before();
+ client = cluster.getClusterClient();
try {
- LOG.info("Restarting Flink cluster.");
- flink = new TestingCluster(config);
-
- flink.start(true);
-
- // Retrieve the job manager
- LOG.info("Retrieving JobManager.");
- ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
-
// Reset static test helpers
StatefulCounter.resetForTest(parallelism);
@@ -598,14 +393,15 @@ public class SavepointITCase extends TestLogger {
"savepoint path " + savepointPath + " in detached mode.");
// Submit the job
- flink.submitJobDetached(modifiedJobGraph);
+ client.setDetached(true);
+ client.submitJob(modifiedJobGraph, SavepointITCase.class.getClassLoader());
// Await state is restored
StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
// Await some progress after restore
StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} finally {
- flink.stop();
+ cluster.after();
}
}
@@ -787,7 +583,6 @@ public class SavepointITCase extends TestLogger {
Configuration config = new Configuration();
config.addAll(jobGraph.getJobConfiguration());
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
final File checkpointDir = new File(tmpDir, "checkpoints");
final File savepointDir = new File(tmpDir, "savepoints");
@@ -800,31 +595,40 @@ public class SavepointITCase extends TestLogger {
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
- TestingCluster cluster = new TestingCluster(config, false);
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ 1,
+ 2 * jobGraph.getMaximumParallelism()
+ ),
+ true);
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
String savepointPath = null;
try {
- cluster.start();
-
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
for (OneShotLatch latch : iterTestSnapshotWait) {
latch.await();
}
- savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
+ savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
source.cancel();
jobGraph = streamGraph.getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
for (OneShotLatch latch : iterTestRestoreWait) {
latch.await();
}
source.cancel();
} finally {
if (null != savepointPath) {
- cluster.disposeSavepoint(savepointPath);
+ client.disposeSavepoint(savepointPath);
}
- cluster.stop();
+ cluster.after();
}
}
@@ -904,4 +708,30 @@ public class SavepointITCase extends TestLogger {
}
}
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class MiniClusterResourceFactory {
+ private final int numTaskManagers;
+ private final int numSlotsPerTaskManager;
+ private final Configuration config;
+
+ private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
+ this.numTaskManagers = numTaskManagers;
+ this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+ this.config = config;
+ }
+
+ MiniClusterResource get() {
+ return new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ numSlotsPerTaskManager
+ ),
+ true);
+ }
+ }
}