You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:27 UTC

[09/18] flink git commit: [FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6

[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6

This closes #5736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c553ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c553ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c553ba4

Branch: refs/heads/master
Commit: 7c553ba45b44145ea09e4d9ccb0bdf64df7ee076
Parents: db366cd
Author: zentol <ch...@apache.org>
Authored: Wed Mar 21 13:31:56 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../ResumeCheckpointManuallyITCase.java         | 146 +++++++++++++------
 1 file changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c553ba4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 537f864..add4243 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -44,9 +45,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
 
 /**
  * IT case for resuming from checkpoints manually via their external pointer, rather than automatic
@@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 
 		final Configuration config = new Configuration();
 
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-
 		final File savepointDir = temporaryFolder.newFolder();
 
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 
 		if (localRecovery) {
 			config.setString(
@@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
 		}
 
-		TestingCluster cluster = new TestingCluster(config);
-		cluster.start();
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				NUM_TASK_MANAGERS,
+				SLOTS_PER_TASK_MANAGER),
+			true);
+
+		cluster.before();
 
-		String externalCheckpoint = null;
+		ClusterClient<?> client = cluster.getClusterClient();
+		client.setDetached(true);
 
 		try {
+			// main test sequence:  start job -> eCP -> restore job -> eCP -> restore job
+			String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+			assertNotNull(firstExternalCheckpoint);
+
+			String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
+			assertNotNull(secondExternalCheckpoint);
 
-			// main test sequence:  start job -> eCP -> restore job -> eCP -> restore job -> eCP
-			for (int i = 0; i < 3; ++i) {
-				final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
+			assertNotNull(thirdExternalCheckpoint);
+		} finally {
+			cluster.after();
+		}
+	}
 
-				env.setStateBackend(backend);
-				env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-				env.setParallelism(PARALLELISM);
+	private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
+		JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+		NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
 
-				// initialize count down latch
-				NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
+		client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
 
-				env.addSource(new NotifyingInfiniteTupleSource(10_000))
-					.keyBy(0)
-					.timeWindow(Time.seconds(3))
-					.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
-					.filter(value -> value.f0.startsWith("Tuple 0"));
+		// wait until all sources have been started
+		NotifyingInfiniteTupleSource.countDownLatch.await();
 
-				StreamGraph streamGraph = env.getStreamGraph();
-				streamGraph.setJobName("Test");
+		waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
+		client.cancel(initialJobGraph.getJobID());
+		waitUntilCanceled(initialJobGraph.getJobID(), client);
 
-				JobGraph jobGraph = streamGraph.getJobGraph();
+		return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
+	}
+
+	private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
+		Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+		if (!checkpoint.isPresent()) {
+			throw new AssertionError("No complete checkpoint could be found.");
+		} else {
+			return checkpoint.get().toString();
+		}
+	}
 
-				// recover from previous iteration?
-				if (externalCheckpoint != null) {
-					jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+	private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
+		while (true) {
+			Thread.sleep(50);
+			Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+			if (externalizedCheckpoint.isPresent()) {
+				break;
+			}
+		}
+	}
+
+	private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
+		return Files.list(checkpointDir.toPath().resolve(jobId.toString()))
+			.filter(path -> path.getFileName().toString().startsWith("chk-"))
+			.filter(path -> {
+				try {
+					return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta"));
+				} catch (IOException ignored) {
+					return false;
 				}
+			})
+			.findAny();
+	}
 
-				config.addAll(jobGraph.getJobConfiguration());
-				JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+	private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
+		while (client.getJobStatus(jobId).get() != JobStatus.CANCELLING) {
+			Thread.sleep(50);
+		}
+	}
 
-				// wait until all sources have been started
-				NotifyingInfiniteTupleSource.countDownLatch.await();
+	private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-				externalCheckpoint = cluster.requestCheckpoint(
-						submissionResult.getJobID(),
-						CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
+		env.enableCheckpointing(500);
+		env.setStateBackend(backend);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(PARALLELISM);
+		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
-				cluster.cancelJob(submissionResult.getJobID());
-			}
-		} finally {
-			cluster.stop();
-			cluster.awaitTermination();
+		env.addSource(new NotifyingInfiniteTupleSource(10_000))
+			.keyBy(0)
+			.timeWindow(Time.seconds(3))
+			.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
+			.filter(value -> value.f0.startsWith("Tuple 0"));
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setJobName("Test");
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		// recover from previous iteration?
+		if (externalCheckpoint != null) {
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
 		}
+
+		return jobGraph;
 	}
 
 	/**