You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/05 13:28:01 UTC

flink git commit: [FLINK-5248] [tests] Catch restore failures in SavepointITCase

Repository: flink
Updated Branches:
  refs/heads/master e288617f9 -> 4dee8feaf


[FLINK-5248] [tests] Catch restore failures in SavepointITCase

- Minor test clean up
- The test did not catch a task restore failure since only the
  TDDs were tested. Now, we test that restore is actually called
  and some checkpoints complete after restoring from a savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4dee8fea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4dee8fea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4dee8fea

Branch: refs/heads/master
Commit: 4dee8feaf7638bb043d7f3334cc8974456893a3d
Parents: e288617
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Dec 5 11:22:19 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Dec 5 11:53:56 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 247 +++++++++----------
 1 file changed, 119 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4dee8fea/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ab9c1fa..d52f115 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -62,7 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -76,14 +75,13 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
@@ -101,10 +99,7 @@ public class SavepointITCase extends TestLogger {
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
 
 	@Rule
-	public RetryRule retryRule = new RetryRule();
-
-	@Rule
-	public TemporaryFolder folder= new TemporaryFolder();
+	public TemporaryFolder folder = new TemporaryFolder();
 
 	/**
 	 * Tests that it is possible to submit a job, trigger a savepoint, and
@@ -114,8 +109,9 @@ public class SavepointITCase extends TestLogger {
 	 * <ol>
 	 * <li>Submit job, wait for some checkpoints to complete</li>
 	 * <li>Trigger savepoint and verify that savepoint has been created</li>
-	 * <li>Shut down the cluster, re-submit the job from the savepoint, and
-	 * verify that the initial state has been reset</li>
+	 * <li>Shut down the cluster, re-submit the job from the savepoint,
+	 * verify that the initial state has been reset, and
+	 * all tasks are running again</li>
 	 * <li>Cancel job, dispose the savepoint, and verify that everything
 	 * has been cleaned up</li>
 	 * </ol>
@@ -131,10 +127,11 @@ public class SavepointITCase extends TestLogger {
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
 		// The number of checkpoints to complete before triggering the savepoint
-		final int numberOfCompletedCheckpoints = 10;
+		final int numberOfCompletedCheckpoints = 2;
+		final int checkpointingInterval = 100;
 
 		// Temporary directory for file state backend
-		final File tmpDir = CommonTestUtils.createTempDirectory();
+		final File tmpDir = folder.newFolder();
 
 		LOG.info("Created temporary directory: " + tmpDir + ".");
 
@@ -161,55 +158,50 @@ public class SavepointITCase extends TestLogger {
 
 			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-					checkpointDir.toURI().toString());
+				checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-					savepointDir.toURI().toString());
+				savepointDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
 
 			// Start Flink
 			flink = new TestingCluster(config);
-			LOG.info("Starting Flink cluster.");
 			flink.start();
 
 			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
 			ActorGateway jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
+				flink.leaderGateway().future(),
+				deadline.timeLeft());
 
 			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000);
+			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval);
 			final JobID jobId = jobGraph.getJobID();
 
 			// Wait for the source to be notified about the expected number
 			// of completed checkpoints
-			InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(
-					numberOfCompletedCheckpoints);
+			StatefulCounter.resetForTest();
 
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			flink.submitJobDetached(jobGraph);
 
-			LOG.info("Waiting for " + numberOfCompletedCheckpoints +
-					" checkpoint complete notifications.");
+			LOG.info("Waiting for " + numberOfCompletedCheckpoints + " checkpoint complete notifications.");
 
 			// Wait...
-			InfiniteTestSource.CheckpointCompleteLatch.await();
+			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
 
 			LOG.info("Received all " + numberOfCompletedCheckpoints +
-					" checkpoint complete notifications.");
+				" checkpoint complete notifications.");
 
 			// ...and then trigger the savepoint
 			LOG.info("Triggering a savepoint.");
 
 			Future<Object> savepointPathFuture = jobManager.ask(
-					new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+				new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
 
 			final String savepointPath = ((TriggerSavepointSuccess) Await
-					.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+				.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
 			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
 
 			// Only one savepoint should exist
@@ -222,12 +214,10 @@ public class SavepointITCase extends TestLogger {
 
 			// Retrieve the savepoint from the testing job manager
 			LOG.info("Requesting the savepoint.");
-			Future<Object> savepointFuture = jobManager.ask(
-					new RequestSavepoint(savepointPath),
-					deadline.timeLeft());
+			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
 
 			SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
-					savepointFuture, deadline.timeLeft())).savepoint();
+				savepointFuture, deadline.timeLeft())).savepoint();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
 			// Shut down the Flink cluster (thereby canceling the job)
@@ -238,24 +228,16 @@ public class SavepointITCase extends TestLogger {
 			// - Verification START -------------------------------------------
 
 			// Only one checkpoint of the savepoint should exist
-			String errMsg = "Checkpoints directory not cleaned up properly.";
+			// We currently have the following directory layout: checkpointDir/jobId/chk-ID
 			files = checkpointDir.listFiles();
-			if (files != null) {
-				assertEquals(errMsg, 1, files.length);
-			}
-			else {
-				fail(errMsg);
-			}
+			assertNotNull("Checkpoint directory empty", files);
+			assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
+			assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
 
 			// Only one savepoint should exist
-			errMsg = "Savepoints directory cleaned up.";
 			files = savepointDir.listFiles();
-			if (files != null) {
-				assertEquals(errMsg, 1, files.length);
-			}
-			else {
-				fail(errMsg);
-			}
+			assertNotNull("Savepoint directory empty", files);
+			assertEquals("No savepoint found in savepoint directory", 1, files.length);
 
 			// - Verification END ---------------------------------------------
 
@@ -265,11 +247,13 @@ public class SavepointITCase extends TestLogger {
 
 			// Retrieve the job manager
 			LOG.info("Retrieving JobManager.");
-			jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
+			jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
 			LOG.info("JobManager: " + jobManager + ".");
 
+			// Reset for restore
+			StatefulCounter.resetForTest();
+
+			// Gather all task deployment descriptors
 			final Throwable[] error = new Throwable[1];
 			final TestingCluster finalFlink = flink;
 			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
@@ -282,15 +266,16 @@ public class SavepointITCase extends TestLogger {
 							// Register to all submit task messages for job
 							for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
 								taskManager.tell(new TestingTaskManagerMessages
-										.RegisterSubmitTaskListener(jobId), getTestActor());
+									.RegisterSubmitTaskListener(jobId), getTestActor());
 							}
 
 							// Set the savepoint path
 							jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 							LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
-									"savepoint path " + savepointPath + " in detached mode.");
+								"savepoint path " + savepointPath + " in detached mode.");
 
+							// Submit the job
 							finalFlink.submitJobDetached(jobGraph);
 
 							int numTasks = 0;
@@ -300,12 +285,12 @@ public class SavepointITCase extends TestLogger {
 
 							// Gather the task deployment descriptors
 							LOG.info("Gathering " + numTasks + " submitted " +
-									"TaskDeploymentDescriptor instances.");
+								"TaskDeploymentDescriptor instances.");
 
 							for (int i = 0; i < numTasks; i++) {
 								ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
-										expectMsgAnyClassOf(getRemainingTime(),
-												ResponseSubmitTaskListener.class);
+									expectMsgAnyClassOf(getRemainingTime(),
+										ResponseSubmitTaskListener.class);
 
 								TaskDeploymentDescriptor tdd = resp.tdd();
 
@@ -317,8 +302,7 @@ public class SavepointITCase extends TestLogger {
 
 								tdds.put(taskInformation.getJobVertexId(), tdd);
 							}
-						}
-						catch (Throwable t) {
+						} catch (Throwable t) {
 							error[0] = t;
 						}
 					}
@@ -327,7 +311,7 @@ public class SavepointITCase extends TestLogger {
 
 			// - Verification START -------------------------------------------
 
-			errMsg = "Error during gathering of TaskDeploymentDescriptors";
+			String errMsg = "Error during gathering of TaskDeploymentDescriptors";
 			assertNull(errMsg, error[0]);
 
 			// Verify that all tasks, which are part of the savepoint
@@ -336,7 +320,7 @@ public class SavepointITCase extends TestLogger {
 				Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID());
 
 				errMsg = "Missing task for savepoint state for operator "
-						+ taskState.getJobVertexID() + ".";
+					+ taskState.getJobVertexID() + ".";
 				assertTrue(errMsg, taskTdds.size() > 0);
 
 				assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
@@ -348,24 +332,27 @@ public class SavepointITCase extends TestLogger {
 
 					errMsg = "Initial operator state mismatch.";
 					assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
-							tdd.getTaskStateHandles().getLegacyOperatorState());
+						tdd.getTaskStateHandles().getLegacyOperatorState());
 				}
 			}
 
+			// Await state is restored
+			StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+
+			// Await some progress after restore
+			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+
 			// - Verification END ---------------------------------------------
 
 			LOG.info("Cancelling job " + jobId + ".");
 			jobManager.tell(new CancelJob(jobId));
 
 			LOG.info("Disposing savepoint " + savepointPath + ".");
-			Future<Object> disposeFuture = jobManager.ask(
-					new DisposeSavepoint(savepointPath),
-					deadline.timeLeft());
+			Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
 
 			errMsg = "Failed to dispose savepoint " + savepointPath + ".";
 			Object resp = Await.result(disposeFuture, deadline.timeLeft());
-			assertTrue(errMsg, resp.getClass() ==
-					getDisposeSavepointSuccess().getClass());
+			assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
 
 			// - Verification START -------------------------------------------
 
@@ -399,12 +386,11 @@ public class SavepointITCase extends TestLogger {
 
 			// All savepoints should have been cleaned up
 			errMsg = "Savepoints directory not cleaned up properly: " +
-					Arrays.toString(savepointDir.listFiles()) + ".";
+				Arrays.toString(savepointDir.listFiles()) + ".";
 			assertEquals(errMsg, 0, savepointDir.listFiles().length);
 
 			// - Verification END ---------------------------------------------
-		}
-		finally {
+		} finally {
 			if (flink != null) {
 				flink.shutdown();
 			}
@@ -436,7 +422,7 @@ public class SavepointITCase extends TestLogger {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-					savepointDir.toURI().toString());
+				savepointDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
 
@@ -448,8 +434,8 @@ public class SavepointITCase extends TestLogger {
 			// Retrieve the job manager
 			LOG.info("Retrieving JobManager.");
 			ActorGateway jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
+				flink.leaderGateway().future(),
+				deadline.timeLeft());
 			LOG.info("JobManager: " + jobManager + ".");
 
 			// High value to ensure timeouts if restarted.
@@ -467,13 +453,11 @@ public class SavepointITCase extends TestLogger {
 
 			try {
 				flink.submitJobAndWait(jobGraph, false);
-			}
-			catch (Exception e) {
+			} catch (Exception e) {
 				assertEquals(JobExecutionException.class, e.getClass());
 				assertEquals(IllegalArgumentException.class, e.getCause().getClass());
 			}
-		}
-		finally {
+		} finally {
 			if (flink != null) {
 				flink.shutdown();
 			}
@@ -488,10 +472,10 @@ public class SavepointITCase extends TestLogger {
 	 * Creates a streaming JobGraph from the StreamEnvironment.
 	 */
 	private JobGraph createJobGraph(
-			int parallelism,
-			int numberOfRetries,
-			long restartDelay,
-			int checkpointingInterval) {
+		int parallelism,
+		int numberOfRetries,
+		long restartDelay,
+		int checkpointingInterval) {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
@@ -501,24 +485,20 @@ public class SavepointITCase extends TestLogger {
 		env.getConfig().disableSysoutLogging();
 
 		DataStream<Integer> stream = env
-				.addSource(new InfiniteTestSource())
-				.shuffle()
-				.map(new StatefulCounter());
+			.addSource(new InfiniteTestSource())
+			.shuffle()
+			.map(new StatefulCounter());
 
 		stream.addSink(new DiscardingSink<Integer>());
 
 		return env.getStreamGraph().getJobGraph();
 	}
 
-	private static class InfiniteTestSource
-			implements SourceFunction<Integer>, CheckpointListener {
+	private static class InfiniteTestSource implements SourceFunction<Integer> {
 
 		private static final long serialVersionUID = 1L;
 		private volatile boolean running = true;
 
-		// Test control
-		private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1);
-
 		@Override
 		public void run(SourceContext<Integer> ctx) throws Exception {
 			while (running) {
@@ -530,16 +510,16 @@ public class SavepointITCase extends TestLogger {
 		public void cancel() {
 			running = false;
 		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) throws Exception {
-			CheckpointCompleteLatch.countDown();
-		}
 	}
 
 	private static class StatefulCounter
-			extends RichMapFunction<Integer, Integer>
-			implements Checkpointed<byte[]> {
+		extends RichMapFunction<Integer, Integer>
+		implements Checkpointed<byte[]>, CheckpointListener {
+
+		private static final Object checkpointLock = new Object();
+		private static int numCompleteCalls;
+		private static int numRestoreCalls;
+		private static boolean restoredFromCheckpoint;
 
 		private static final long serialVersionUID = 7317800376639115920L;
 		private byte[] data;
@@ -570,55 +550,66 @@ public class SavepointITCase extends TestLogger {
 		@Override
 		public void restoreState(byte[] data) throws Exception {
 			this.data = data;
+
+			synchronized (checkpointLock) {
+				if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {
+					restoredFromCheckpoint = true;
+					checkpointLock.notifyAll();
+				}
+			}
 		}
-	}
 
-	/**
-	 * Test source that counts calls to restoreState and that can be configured
-	 * to fail on restoreState calls.
-	 */
-	private static class RestoreStateCountingAndFailingSource
-			implements SourceFunction<Integer>, Checkpointed, CheckpointListener {
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			synchronized (checkpointLock) {
+				numCompleteCalls++;
+				checkpointLock.notifyAll();
+			}
+		}
 
-		private static final long serialVersionUID = 1L;
+		// --------------------------------------------------------------------
 
-		private static volatile int numRestoreStateCalls = 0;
-		private static volatile boolean failOnRestoreStateCall = false;
-		private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1);
-		private static volatile int emitted = 0;
+		static void resetForTest() {
+			synchronized (checkpointLock) {
+				numCompleteCalls = 0;
+				numRestoreCalls = 0;
+				restoredFromCheckpoint = false;
+			}
+		}
 
-		private volatile boolean running = true;
+		static void awaitCompletedCheckpoints(
+				int parallelism,
+				int expectedNumberOfCompletedCheckpoints,
+				long timeoutMillis) throws InterruptedException, TimeoutException {
 
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (running) {
-				ctx.collect(1);
-				emitted++;
+			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
 
-				if (failOnRestoreStateCall) {
-					throw new RuntimeException("Restore test failure");
+			synchronized (checkpointLock) {
+				// One completion notification per parallel subtask
+				int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints;
+				while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline) {
+					checkpointLock.wait();
 				}
-			}
-		}
 
-		@Override
-		public void cancel() {
-			running = false;
+				if (numCompleteCalls < expectedNumber) {
+					throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints +
+						" within timeout of " + timeoutMillis + " millis.");
+				}
+			}
 		}
 
-		@Override
-		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return 1;
-		}
+		static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException, TimeoutException {
+			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
 
-		@Override
-		public void restoreState(Serializable state) throws Exception {
-			numRestoreStateCalls++;
-		}
+			synchronized (checkpointLock) {
+				while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline) {
+					checkpointLock.wait();
+				}
 
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) throws Exception {
-			checkpointCompleteLatch.countDown();
+				if (!restoredFromCheckpoint) {
+					throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis + " millis.");
+				}
+			}
 		}
 	}