You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:27 UTC
[09/18] flink git commit: [FLINK-9042][tests] Port
ResumeCheckpointManuallyITCase to flip6
[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6
This closes #5736.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c553ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c553ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c553ba4
Branch: refs/heads/master
Commit: 7c553ba45b44145ea09e4d9ccb0bdf64df7ee076
Parents: db366cd
Author: zentol <ch...@apache.org>
Authored: Wed Mar 21 13:31:56 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200
----------------------------------------------------------------------
.../ResumeCheckpointManuallyITCase.java | 146 +++++++++++++------
1 file changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c553ba4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 537f864..add4243 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -18,25 +18,26 @@
package org.apache.flink.test.checkpointing;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
@@ -44,9 +45,17 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
/**
* IT case for resuming from checkpoints manually via their external pointer, rather than automatic
@@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
final Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-
final File savepointDir = temporaryFolder.newFolder();
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
- config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
if (localRecovery) {
config.setString(
@@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
- TestingCluster cluster = new TestingCluster(config);
- cluster.start();
+ MiniClusterResource cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ NUM_TASK_MANAGERS,
+ SLOTS_PER_TASK_MANAGER),
+ true);
+
+ cluster.before();
- String externalCheckpoint = null;
+ ClusterClient<?> client = cluster.getClusterClient();
+ client.setDetached(true);
try {
+ // main test sequence: start job -> eCP -> restore job -> eCP -> restore job
+ String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
+ assertNotNull(firstExternalCheckpoint);
+
+ String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
+ assertNotNull(secondExternalCheckpoint);
- // main test sequence: start job -> eCP -> restore job -> eCP -> restore job -> eCP
- for (int i = 0; i < 3; ++i) {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
+ assertNotNull(thirdExternalCheckpoint);
+ } finally {
+ cluster.after();
+ }
+ }
- env.setStateBackend(backend);
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- env.setParallelism(PARALLELISM);
+ private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
+ JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+ NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
- // initialize count down latch
- NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
+ client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
- env.addSource(new NotifyingInfiniteTupleSource(10_000))
- .keyBy(0)
- .timeWindow(Time.seconds(3))
- .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
- .filter(value -> value.f0.startsWith("Tuple 0"));
+ // wait until all sources have been started
+ NotifyingInfiniteTupleSource.countDownLatch.await();
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.setJobName("Test");
+ waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
+ client.cancel(initialJobGraph.getJobID());
+ waitUntilCanceled(initialJobGraph.getJobID(), client);
- JobGraph jobGraph = streamGraph.getJobGraph();
+ return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
+ }
+
+ private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
+ Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+ if (!checkpoint.isPresent()) {
+ throw new AssertionError("No complete checkpoint could be found.");
+ } else {
+ return checkpoint.get().toString();
+ }
+ }
- // recover from previous iteration?
- if (externalCheckpoint != null) {
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+ private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
+ while (true) {
+ Thread.sleep(50);
+ Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId);
+ if (externalizedCheckpoint.isPresent()) {
+ break;
+ }
+ }
+ }
+
+ private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
+ return Files.list(checkpointDir.toPath().resolve(jobId.toString()))
+ .filter(path -> path.getFileName().toString().startsWith("chk-"))
+ .filter(path -> {
+ try {
+ return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta"));
+ } catch (IOException ignored) {
+ return false;
}
+ })
+ .findAny();
+ }
- config.addAll(jobGraph.getJobConfiguration());
- JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+ private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
+ while (client.getJobStatus(jobId).get() != JobStatus.CANCELLING) {
+ Thread.sleep(50);
+ }
+ }
- // wait until all sources have been started
- NotifyingInfiniteTupleSource.countDownLatch.await();
+ private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- externalCheckpoint = cluster.requestCheckpoint(
- submissionResult.getJobID(),
- CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
+ env.enableCheckpointing(500);
+ env.setStateBackend(backend);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setParallelism(PARALLELISM);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- cluster.cancelJob(submissionResult.getJobID());
- }
- } finally {
- cluster.stop();
- cluster.awaitTermination();
+ env.addSource(new NotifyingInfiniteTupleSource(10_000))
+ .keyBy(0)
+ .timeWindow(Time.seconds(3))
+ .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
+ .filter(value -> value.f0.startsWith("Tuple 0"));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setJobName("Test");
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ // recover from previous iteration?
+ if (externalCheckpoint != null) {
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
}
+
+ return jobGraph;
}
/**