You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/05 13:28:01 UTC
flink git commit: [FLINK-5248] [tests] Catch restore failures in
SavepointITCase
Repository: flink
Updated Branches:
refs/heads/master e288617f9 -> 4dee8feaf
[FLINK-5248] [tests] Catch restore failures in SavepointITCase
- Minor test clean up
- The test did not catch a task restore failure since only the
TDDs were tested. Now, we test that restore is actually called
and some checkpoints complete after restoring from a savepoint.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dee8fea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dee8fea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dee8fea
Branch: refs/heads/master
Commit: 4dee8feaf7638bb043d7f3334cc8974456893a3d
Parents: e288617
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Dec 5 11:22:19 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Dec 5 11:53:56 2016 +0100
----------------------------------------------------------------------
.../test/checkpointing/SavepointITCase.java | 247 +++++++++----------
1 file changed, 119 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4dee8fea/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ab9c1fa..d52f115 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -62,7 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
@@ -76,14 +75,13 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
@@ -101,10 +99,7 @@ public class SavepointITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
@Rule
- public RetryRule retryRule = new RetryRule();
-
- @Rule
- public TemporaryFolder folder= new TemporaryFolder();
+ public TemporaryFolder folder = new TemporaryFolder();
/**
* Tests that it is possible to submit a job, trigger a savepoint, and
@@ -114,8 +109,9 @@ public class SavepointITCase extends TestLogger {
* <ol>
* <li>Submit job, wait for some checkpoints to complete</li>
* <li>Trigger savepoint and verify that savepoint has been created</li>
- * <li>Shut down the cluster, re-submit the job from the savepoint, and
- * verify that the initial state has been reset</li>
+ * <li>Shut down the cluster, re-submit the job from the savepoint,
+ * verify that the initial state has been reset, and
+ * all tasks are running again</li>
* <li>Cancel job, dispose the savepoint, and verify that everything
* has been cleaned up</li>
* </ol>
@@ -131,10 +127,11 @@ public class SavepointITCase extends TestLogger {
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
// The number of checkpoints to complete before triggering the savepoint
- final int numberOfCompletedCheckpoints = 10;
+ final int numberOfCompletedCheckpoints = 2;
+ final int checkpointingInterval = 100;
// Temporary directory for file state backend
- final File tmpDir = CommonTestUtils.createTempDirectory();
+ final File tmpDir = folder.newFolder();
LOG.info("Created temporary directory: " + tmpDir + ".");
@@ -161,55 +158,50 @@ public class SavepointITCase extends TestLogger {
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
- checkpointDir.toURI().toString());
+ checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
- savepointDir.toURI().toString());
+ savepointDir.toURI().toString());
LOG.info("Flink configuration: " + config + ".");
// Start Flink
flink = new TestingCluster(config);
- LOG.info("Starting Flink cluster.");
flink.start();
// Retrieve the job manager
- LOG.info("Retrieving JobManager.");
ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
- LOG.info("JobManager: " + jobManager + ".");
+ flink.leaderGateway().future(),
+ deadline.timeLeft());
// Submit the job
- final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000);
+ final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval);
final JobID jobId = jobGraph.getJobID();
// Wait for the source to be notified about the expected number
// of completed checkpoints
- InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(
- numberOfCompletedCheckpoints);
+ StatefulCounter.resetForTest();
LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
flink.submitJobDetached(jobGraph);
- LOG.info("Waiting for " + numberOfCompletedCheckpoints +
- " checkpoint complete notifications.");
+ LOG.info("Waiting for " + numberOfCompletedCheckpoints + " checkpoint complete notifications.");
// Wait...
- InfiniteTestSource.CheckpointCompleteLatch.await();
+ StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
LOG.info("Received all " + numberOfCompletedCheckpoints +
- " checkpoint complete notifications.");
+ " checkpoint complete notifications.");
// ...and then trigger the savepoint
LOG.info("Triggering a savepoint.");
Future<Object> savepointPathFuture = jobManager.ask(
- new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+ new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
final String savepointPath = ((TriggerSavepointSuccess) Await
- .result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+ .result(savepointPathFuture, deadline.timeLeft())).savepointPath();
LOG.info("Retrieved savepoint path: " + savepointPath + ".");
// Only one savepoint should exist
@@ -222,12 +214,10 @@ public class SavepointITCase extends TestLogger {
// Retrieve the savepoint from the testing job manager
LOG.info("Requesting the savepoint.");
- Future<Object> savepointFuture = jobManager.ask(
- new RequestSavepoint(savepointPath),
- deadline.timeLeft());
+ Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
- savepointFuture, deadline.timeLeft())).savepoint();
+ savepointFuture, deadline.timeLeft())).savepoint();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
// Shut down the Flink cluster (thereby canceling the job)
@@ -238,24 +228,16 @@ public class SavepointITCase extends TestLogger {
// - Verification START -------------------------------------------
// Only one checkpoint of the savepoint should exist
- String errMsg = "Checkpoints directory not cleaned up properly.";
+ // We currently have the following directory layout: checkpointDir/jobId/chk-ID
files = checkpointDir.listFiles();
- if (files != null) {
- assertEquals(errMsg, 1, files.length);
- }
- else {
- fail(errMsg);
- }
+ assertNotNull("Checkpoint directory empty", files);
+ assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
+ assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
// Only one savepoint should exist
- errMsg = "Savepoints directory cleaned up.";
files = savepointDir.listFiles();
- if (files != null) {
- assertEquals(errMsg, 1, files.length);
- }
- else {
- fail(errMsg);
- }
+ assertNotNull("Savepoint directory empty", files);
+ assertEquals("No savepoint found in savepoint directory", 1, files.length);
// - Verification END ---------------------------------------------
@@ -265,11 +247,13 @@ public class SavepointITCase extends TestLogger {
// Retrieve the job manager
LOG.info("Retrieving JobManager.");
- jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
+ jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
LOG.info("JobManager: " + jobManager + ".");
+ // Reset for restore
+ StatefulCounter.resetForTest();
+
+ // Gather all task deployment descriptors
final Throwable[] error = new Throwable[1];
final TestingCluster finalFlink = flink;
final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
@@ -282,15 +266,16 @@ public class SavepointITCase extends TestLogger {
// Register to all submit task messages for job
for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
taskManager.tell(new TestingTaskManagerMessages
- .RegisterSubmitTaskListener(jobId), getTestActor());
+ .RegisterSubmitTaskListener(jobId), getTestActor());
}
// Set the savepoint path
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
- "savepoint path " + savepointPath + " in detached mode.");
+ "savepoint path " + savepointPath + " in detached mode.");
+ // Submit the job
finalFlink.submitJobDetached(jobGraph);
int numTasks = 0;
@@ -300,12 +285,12 @@ public class SavepointITCase extends TestLogger {
// Gather the task deployment descriptors
LOG.info("Gathering " + numTasks + " submitted " +
- "TaskDeploymentDescriptor instances.");
+ "TaskDeploymentDescriptor instances.");
for (int i = 0; i < numTasks; i++) {
ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
- expectMsgAnyClassOf(getRemainingTime(),
- ResponseSubmitTaskListener.class);
+ expectMsgAnyClassOf(getRemainingTime(),
+ ResponseSubmitTaskListener.class);
TaskDeploymentDescriptor tdd = resp.tdd();
@@ -317,8 +302,7 @@ public class SavepointITCase extends TestLogger {
tdds.put(taskInformation.getJobVertexId(), tdd);
}
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
error[0] = t;
}
}
@@ -327,7 +311,7 @@ public class SavepointITCase extends TestLogger {
// - Verification START -------------------------------------------
- errMsg = "Error during gathering of TaskDeploymentDescriptors";
+ String errMsg = "Error during gathering of TaskDeploymentDescriptors";
assertNull(errMsg, error[0]);
// Verify that all tasks, which are part of the savepoint
@@ -336,7 +320,7 @@ public class SavepointITCase extends TestLogger {
Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID());
errMsg = "Missing task for savepoint state for operator "
- + taskState.getJobVertexID() + ".";
+ + taskState.getJobVertexID() + ".";
assertTrue(errMsg, taskTdds.size() > 0);
assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
@@ -348,24 +332,27 @@ public class SavepointITCase extends TestLogger {
errMsg = "Initial operator state mismatch.";
assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
- tdd.getTaskStateHandles().getLegacyOperatorState());
+ tdd.getTaskStateHandles().getLegacyOperatorState());
}
}
+ // Await state is restored
+ StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+
+ // Await some progress after restore
+ StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+
// - Verification END ---------------------------------------------
LOG.info("Cancelling job " + jobId + ".");
jobManager.tell(new CancelJob(jobId));
LOG.info("Disposing savepoint " + savepointPath + ".");
- Future<Object> disposeFuture = jobManager.ask(
- new DisposeSavepoint(savepointPath),
- deadline.timeLeft());
+ Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
errMsg = "Failed to dispose savepoint " + savepointPath + ".";
Object resp = Await.result(disposeFuture, deadline.timeLeft());
- assertTrue(errMsg, resp.getClass() ==
- getDisposeSavepointSuccess().getClass());
+ assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
// - Verification START -------------------------------------------
@@ -399,12 +386,11 @@ public class SavepointITCase extends TestLogger {
// All savepoints should have been cleaned up
errMsg = "Savepoints directory not cleaned up properly: " +
- Arrays.toString(savepointDir.listFiles()) + ".";
+ Arrays.toString(savepointDir.listFiles()) + ".";
assertEquals(errMsg, 0, savepointDir.listFiles().length);
// - Verification END ---------------------------------------------
- }
- finally {
+ } finally {
if (flink != null) {
flink.shutdown();
}
@@ -436,7 +422,7 @@ public class SavepointITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
- savepointDir.toURI().toString());
+ savepointDir.toURI().toString());
LOG.info("Flink configuration: " + config + ".");
@@ -448,8 +434,8 @@ public class SavepointITCase extends TestLogger {
// Retrieve the job manager
LOG.info("Retrieving JobManager.");
ActorGateway jobManager = Await.result(
- flink.leaderGateway().future(),
- deadline.timeLeft());
+ flink.leaderGateway().future(),
+ deadline.timeLeft());
LOG.info("JobManager: " + jobManager + ".");
// High value to ensure timeouts if restarted.
@@ -467,13 +453,11 @@ public class SavepointITCase extends TestLogger {
try {
flink.submitJobAndWait(jobGraph, false);
- }
- catch (Exception e) {
+ } catch (Exception e) {
assertEquals(JobExecutionException.class, e.getClass());
assertEquals(IllegalArgumentException.class, e.getCause().getClass());
}
- }
- finally {
+ } finally {
if (flink != null) {
flink.shutdown();
}
@@ -488,10 +472,10 @@ public class SavepointITCase extends TestLogger {
* Creates a streaming JobGraph from the StreamEnvironment.
*/
private JobGraph createJobGraph(
- int parallelism,
- int numberOfRetries,
- long restartDelay,
- int checkpointingInterval) {
+ int parallelism,
+ int numberOfRetries,
+ long restartDelay,
+ int checkpointingInterval) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
@@ -501,24 +485,20 @@ public class SavepointITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
DataStream<Integer> stream = env
- .addSource(new InfiniteTestSource())
- .shuffle()
- .map(new StatefulCounter());
+ .addSource(new InfiniteTestSource())
+ .shuffle()
+ .map(new StatefulCounter());
stream.addSink(new DiscardingSink<Integer>());
return env.getStreamGraph().getJobGraph();
}
- private static class InfiniteTestSource
- implements SourceFunction<Integer>, CheckpointListener {
+ private static class InfiniteTestSource implements SourceFunction<Integer> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
- // Test control
- private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1);
-
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
@@ -530,16 +510,16 @@ public class SavepointITCase extends TestLogger {
public void cancel() {
running = false;
}
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- CheckpointCompleteLatch.countDown();
- }
}
private static class StatefulCounter
- extends RichMapFunction<Integer, Integer>
- implements Checkpointed<byte[]> {
+ extends RichMapFunction<Integer, Integer>
+ implements Checkpointed<byte[]>, CheckpointListener {
+
+ private static final Object checkpointLock = new Object();
+ private static int numCompleteCalls;
+ private static int numRestoreCalls;
+ private static boolean restoredFromCheckpoint;
private static final long serialVersionUID = 7317800376639115920L;
private byte[] data;
@@ -570,55 +550,66 @@ public class SavepointITCase extends TestLogger {
@Override
public void restoreState(byte[] data) throws Exception {
this.data = data;
+
+ synchronized (checkpointLock) {
+ if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {
+ restoredFromCheckpoint = true;
+ checkpointLock.notifyAll();
+ }
+ }
}
- }
- /**
- * Test source that counts calls to restoreState and that can be configured
- * to fail on restoreState calls.
- */
- private static class RestoreStateCountingAndFailingSource
- implements SourceFunction<Integer>, Checkpointed, CheckpointListener {
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ synchronized (checkpointLock) {
+ numCompleteCalls++;
+ checkpointLock.notifyAll();
+ }
+ }
- private static final long serialVersionUID = 1L;
+ // --------------------------------------------------------------------
- private static volatile int numRestoreStateCalls = 0;
- private static volatile boolean failOnRestoreStateCall = false;
- private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1);
- private static volatile int emitted = 0;
+ static void resetForTest() {
+ synchronized (checkpointLock) {
+ numCompleteCalls = 0;
+ numRestoreCalls = 0;
+ restoredFromCheckpoint = false;
+ }
+ }
- private volatile boolean running = true;
+ static void awaitCompletedCheckpoints(
+ int parallelism,
+ int expectedNumberOfCompletedCheckpoints,
+ long timeoutMillis) throws InterruptedException, TimeoutException {
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- while (running) {
- ctx.collect(1);
- emitted++;
+ long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
- if (failOnRestoreStateCall) {
- throw new RuntimeException("Restore test failure");
+ synchronized (checkpointLock) {
+ // One completion notification per parallel subtask
+ int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints;
+ while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline) {
+ checkpointLock.wait();
}
- }
- }
- @Override
- public void cancel() {
- running = false;
+ if (numCompleteCalls < expectedNumber) {
+ throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints +
+ " within timeout of " + timeoutMillis + " millis.");
+ }
+ }
}
- @Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return 1;
- }
+ static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException, TimeoutException {
+ long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
- @Override
- public void restoreState(Serializable state) throws Exception {
- numRestoreStateCalls++;
- }
+ synchronized (checkpointLock) {
+ while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline) {
+ checkpointLock.wait();
+ }
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- checkpointCompleteLatch.countDown();
+ if (!restoredFromCheckpoint) {
+ throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis + " millis.");
+ }
+ }
}
}