You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:47 UTC

[04/11] flink git commit: [FLINK-5929] [tests] Fix SavepointITCase instability

[FLINK-5929] [tests] Fix SavepointITCase instability

When shutting down the testing cluster it can happen that checkpoint
files lingered around (checkpoints independent of the savepoint).

This commit deactives checkpointing for the test and uses count down
latches to track progress, which also reduces the test time.

This closes #3427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c24c7ec3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c24c7ec3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c24c7ec3

Branch: refs/heads/master
Commit: c24c7ec3332d0eb6ebb24eb70c9aabd055cc129f
Parents: 7f244b8
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Feb 28 11:13:28 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 200 ++++++-------------
 1 file changed, 62 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c24c7ec3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index ac37009..ee371dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -24,7 +24,7 @@ import akka.testkit.JavaTestKit;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.io.FileNotFoundException;
-import org.apache.commons.io.FileUtils;
+import java.util.concurrent.CountDownLatch;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -95,7 +94,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
@@ -116,12 +114,11 @@ public class SavepointITCase extends TestLogger {
 	public TemporaryFolder folder = new TemporaryFolder();
 
 	/**
-	 * Tests that it is possible to submit a job, trigger a savepoint, and
-	 * later restart the job on a new cluster. The savepoint is written to
-	 * a file.
+	 * Triggers a savepoint for a job that uses the FsStateBackend. We expect
+	 * that all checkpoint files are written to a new savepoint directory.
 	 *
 	 * <ol>
-	 * <li>Submit job, wait for some checkpoints to complete</li>
+	 * <li>Submit job, wait for some progress</li>
 	 * <li>Trigger savepoint and verify that savepoint has been created</li>
 	 * <li>Shut down the cluster, re-submit the job from the savepoint,
 	 * verify that the initial state has been reset, and
@@ -131,23 +128,13 @@ public class SavepointITCase extends TestLogger {
 	 * </ol>
 	 */
 	@Test
-	public void testTriggerSavepointAndResume() throws Exception {
+	public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
 		// Config
-		int numTaskManagers = 2;
-		int numSlotsPerTaskManager = 2;
-		int parallelism = numTaskManagers * numSlotsPerTaskManager;
-
-		// Test deadline
+		final int numTaskManagers = 2;
+		final int numSlotsPerTaskManager = 2;
+		final int parallelism = numTaskManagers * numSlotsPerTaskManager;
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
-		// The number of checkpoints to complete before triggering the savepoint
-		final int numberOfCompletedCheckpoints = 2;
-		final int checkpointingInterval = 100;
-
-		// Temporary directory for file state backend
-		final File tmpDir = folder.newFolder();
-
-		LOG.info("Created temporary directory: " + tmpDir + ".");
+		final File testRoot = folder.newFolder();
 
 		TestingCluster flink = null;
 
@@ -160,70 +147,51 @@ public class SavepointITCase extends TestLogger {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 
-			final File checkpointDir = new File(tmpDir, "checkpoints");
-			final File savepointRootDir = new File(tmpDir, "savepoints");
+			final File checkpointDir = new File(testRoot, "checkpoints");
+			final File savepointRootDir = new File(testRoot, "savepoints");
 
 			if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
 				fail("Test setup failed: failed to create temporary directories.");
 			}
 
-			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-			LOG.info("Created temporary savepoint directory: " + savepointRootDir + ".");
-
+			// Use file based checkpoints
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-				checkpointDir.toURI().toString());
+			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-				savepointRootDir.toURI().toString());
-
-			LOG.info("Flink configuration: " + config + ".");
+			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointRootDir.toURI().toString());
 
 			// Start Flink
 			flink = new TestingCluster(config);
-			flink.start();
-
-			// Retrieve the job manager
-			ActorGateway jobManager = Await.result(
-				flink.leaderGateway().future(),
-				deadline.timeLeft());
+			flink.start(true);
 
 			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval);
+			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
 			final JobID jobId = jobGraph.getJobID();
 
-			// Wait for the source to be notified about the expected number
-			// of completed checkpoints
-			StatefulCounter.resetForTest();
+			// Reset the static test job helpers
+			StatefulCounter.resetForTest(parallelism);
+
+			// Retrieve the job manager
+			ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
 
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			flink.submitJobDetached(jobGraph);
 
-			LOG.info("Waiting for " + numberOfCompletedCheckpoints + " checkpoint complete notifications.");
+			LOG.info("Waiting for some progress.");
 
-			// Wait...
-			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			LOG.info("Received all " + numberOfCompletedCheckpoints +
-				" checkpoint complete notifications.");
-
-			// ...and then trigger the savepoint
 			LOG.info("Triggering a savepoint.");
-
-			Future<Object> savepointPathFuture = jobManager.ask(
-				new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
-			final String savepointPath = ((TriggerSavepointSuccess) Await
-				.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+			Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+			final String savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
 			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
 
 			// Retrieve the savepoint from the testing job manager
 			LOG.info("Requesting the savepoint.");
 			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
 
-			SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
-				savepointFuture, deadline.timeLeft())).savepoint();
+			SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
 			// Shut down the Flink cluster (thereby canceling the job)
@@ -243,26 +211,25 @@ public class SavepointITCase extends TestLogger {
 				File savepointDir = files[0];
 				File[] savepointFiles = savepointDir.listFiles();
 				assertNotNull(savepointFiles);
-				assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1);
+
+				// Expect one metadata file and one checkpoint file per stateful
+				// parallel subtask
+				String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: "
+					+ Arrays.toString(savepointFiles);
+				assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
 			} else {
 				fail("Savepoint not created in expected directory");
 			}
 
-			// Only one checkpoint of the savepoint should exist
 			// We currently have the following directory layout: checkpointDir/jobId/chk-ID
 			File jobCheckpoints = new File(checkpointDir, jobId.toString());
 
 			if (jobCheckpoints.exists()) {
 				files = jobCheckpoints.listFiles();
 				assertNotNull("Checkpoint directory empty", files);
-				assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length);
+				assertEquals("Checkpoints directory not clean: " + Arrays.toString(files), 0, files.length);
 			}
 
-			// Only one savepoint should exist
-			files = savepointRootDir.listFiles();
-			assertNotNull("Savepoint directory empty", files);
-			assertEquals("No savepoint found in savepoint directory", 1, files.length);
-
 			// - Verification END ---------------------------------------------
 
 			// Restart the cluster
@@ -274,13 +241,14 @@ public class SavepointITCase extends TestLogger {
 			jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
 			LOG.info("JobManager: " + jobManager + ".");
 
-			// Reset for restore
-			StatefulCounter.resetForTest();
+			// Reset static test helpers
+			StatefulCounter.resetForTest(parallelism);
 
 			// Gather all task deployment descriptors
 			final Throwable[] error = new Throwable[1];
 			final TestingCluster finalFlink = flink;
 			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
+
 			new JavaTestKit(testActorSystem) {{
 
 				new Within(deadline.timeLeft()) {
@@ -361,10 +329,10 @@ public class SavepointITCase extends TestLogger {
 			}
 
 			// Await state is restored
-			StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// Await some progress after restore
-			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// - Verification END ---------------------------------------------
 
@@ -396,7 +364,7 @@ public class SavepointITCase extends TestLogger {
 				}
 			}
 
-			// The checkpoint of the savepoint should have been discarded
+			// The checkpoint files of the savepoint should have been discarded
 			for (File f : checkpointFiles) {
 				errMsg = "Checkpoint file " + f + " not cleaned up properly.";
 				assertFalse(errMsg, f.exists());
@@ -418,10 +386,6 @@ public class SavepointITCase extends TestLogger {
 			if (flink != null) {
 				flink.shutdown();
 			}
-
-			if (tmpDir != null) {
-				FileUtils.deleteDirectory(tmpDir);
-			}
 		}
 	}
 
@@ -467,7 +431,7 @@ public class SavepointITCase extends TestLogger {
 			// Submit the job
 			// Long delay to ensure that the test times out if the job
 			// manager tries to restart the job.
-			final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000, 1000);
+			final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000);
 
 			// Set non-existing savepoint path
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
@@ -498,12 +462,10 @@ public class SavepointITCase extends TestLogger {
 	private JobGraph createJobGraph(
 		int parallelism,
 		int numberOfRetries,
-		long restartDelay,
-		int checkpointingInterval) {
+		long restartDelay) {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-		env.enableCheckpointing(checkpointingInterval);
 		env.disableOperatorChaining();
 		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(numberOfRetries, restartDelay));
 		env.getConfig().disableSysoutLogging();
@@ -526,7 +488,9 @@ public class SavepointITCase extends TestLogger {
 		@Override
 		public void run(SourceContext<Integer> ctx) throws Exception {
 			while (running) {
-				ctx.collect(1);
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(1);
+				}
 			}
 		}
 
@@ -536,14 +500,12 @@ public class SavepointITCase extends TestLogger {
 		}
 	}
 
-	private static class StatefulCounter
-		extends RichMapFunction<Integer, Integer>
-		implements ListCheckpointed<byte[]>, CheckpointListener {
+	private static class StatefulCounter extends RichMapFunction<Integer, Integer> implements ListCheckpointed<byte[]>{
+
+		private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
+		private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
 
-		private static final Object checkpointLock = new Object();
-		private static int numCompleteCalls;
-		private static int numRestoreCalls;
-		private static boolean restoredFromCheckpoint;
+		private int numCollectedElements = 0;
 
 		private static final long serialVersionUID = 7317800376639115920L;
 		private byte[] data;
@@ -563,6 +525,11 @@ public class SavepointITCase extends TestLogger {
 			for (int i = 0; i < data.length; i++) {
 				data[i] += 1;
 			}
+
+			if (numCollectedElements++ > 10) {
+				progressLatch.countDown();
+			}
+
 			return value;
 		}
 
@@ -578,65 +545,22 @@ public class SavepointITCase extends TestLogger {
 			}
 			this.data = state.get(0);
 
-			synchronized (checkpointLock) {
-				if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {
-					restoredFromCheckpoint = true;
-					checkpointLock.notifyAll();
-				}
-			}
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) throws Exception {
-			synchronized (checkpointLock) {
-				numCompleteCalls++;
-				checkpointLock.notifyAll();
-			}
+			restoreLatch.countDown();
 		}
 
 		// --------------------------------------------------------------------
 
-		static void resetForTest() {
-			synchronized (checkpointLock) {
-				numCompleteCalls = 0;
-				numRestoreCalls = 0;
-				restoredFromCheckpoint = false;
-			}
+		static CountDownLatch getProgressLatch() {
+			return progressLatch;
 		}
 
-		static void awaitCompletedCheckpoints(
-				int parallelism,
-				int expectedNumberOfCompletedCheckpoints,
-				long timeoutMillis) throws InterruptedException, TimeoutException {
-
-			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
-
-			synchronized (checkpointLock) {
-				// One completion notification per parallel subtask
-				int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints;
-				while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline) {
-					checkpointLock.wait();
-				}
-
-				if (numCompleteCalls < expectedNumber) {
-					throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints +
-						" within timeout of " + timeoutMillis + " millis.");
-				}
-			}
+		static CountDownLatch getRestoreLatch() {
+			return restoreLatch;
 		}
 
-		static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException, TimeoutException {
-			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
-
-			synchronized (checkpointLock) {
-				while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline) {
-					checkpointLock.wait();
-				}
-
-				if (!restoredFromCheckpoint) {
-					throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis + " millis.");
-				}
-			}
+		static void resetForTest(int parallelism) {
+			progressLatch = new CountDownLatch(parallelism);
+			restoreLatch = new CountDownLatch(parallelism);
 		}
 	}