You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 16:26:19 UTC
[flink] branch release-1.15 updated: [FLINK-26794][tests] Use API to access job checkpoints in IT cases
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 4e87586 [FLINK-26794][tests] Use API to access job checkpoints in IT cases
4e87586 is described below
commit 4e87586dfb0a7c208440d331751719be0be2ccaa
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Mar 28 12:59:42 2022 +0200
[FLINK-26794][tests] Use API to access job checkpoints in IT cases
Using dedicated API has the following advantages over looking for checkpoints on FS:
- less prone to NoSuchFileException
- less brittle because file layout is not a declared API
- less IO usage
---
.../flink/runtime/testutils/CommonTestUtils.java | 35 ++++++++++++++++++++++
.../test/state/ChangelogCompatibilityITCase.java | 20 +++++++++----
.../flink/test/state/ChangelogRescalingITCase.java | 30 +++++++++----------
.../java/org/apache/flink/test/util/TestUtils.java | 13 ++++++++
4 files changed, 77 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 7e01ee4..678eb2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.util.FileUtils;
@@ -47,6 +49,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
@@ -359,6 +363,37 @@ public class CommonTestUtils {
Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
}
+ /** Wait for at least one successful checkpoint. */
+ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, Deadline timeout)
+ throws Exception, FlinkJobNotFoundException {
+ waitUntilCondition(
+ () ->
+ Optional.ofNullable(
+ miniCluster
+ .getExecutionGraph(jobID)
+ .get()
+ .getCheckpointStatsSnapshot())
+ .filter(st -> st.getCounts().getNumberOfCompletedCheckpoints() > 0)
+ .isPresent(),
+ timeout);
+ }
+
+ /**
+ * @return the path as {@link java.net.URI} to the latest checkpoint.
+ * @throws FlinkJobNotFoundException if job not found
+ */
+ public static Optional<String> getLatestCompletedCheckpointPath(
+ JobID jobID, MiniCluster cluster)
+ throws InterruptedException, ExecutionException, FlinkJobNotFoundException {
+ return Optional.ofNullable(
+ cluster.getExecutionGraph(jobID).get().getCheckpointStatsSnapshot())
+ .flatMap(
+ stats ->
+ Optional.ofNullable(
+ stats.getHistory().getLatestCompletedCheckpoint()))
+ .map(CompletedCheckpointStats::getExternalPath);
+ }
+
/** Utility class to read the output of a process stream and forward it into a StringWriter. */
public static class PipeForwarder extends Thread {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index 3023e99..ef4052c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -18,11 +18,13 @@
package org.apache.flink.test.state;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -37,16 +39,18 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Optional;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpointMaybe;
import static org.apache.flink.util.ExceptionUtils.findThrowableSerializedAware;
import static org.junit.Assert.fail;
@@ -141,12 +145,18 @@ public class ChangelogCompatibilityITCase {
ClusterClient<?> client = miniClusterResource.getClusterClient();
submit(jobGraph, client);
if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
- while (!getMostRecentCompletedCheckpointMaybe(checkpointDir).isPresent()) {
- Thread.sleep(50);
- }
+ waitForCheckpoint(
+ jobGraph.getJobID(),
+ miniClusterResource.getMiniCluster(),
+ Deadline.fromNow(Duration.ofMinutes(5)));
client.cancel(jobGraph.getJobID()).get();
// obtain the latest checkpoint *after* cancellation - that one won't be subsumed
- return pathToString(getMostRecentCompletedCheckpointMaybe(checkpointDir).get());
+ return CommonTestUtils.getLatestCompletedCheckpointPath(
+ jobGraph.getJobID(), miniClusterResource.getMiniCluster())
+ .<NoSuchElementException>orElseThrow(
+ () -> {
+ throw new NoSuchElementException("No checkpoint was created yet");
+ });
} else {
return client.stopWithSavepoint(
jobGraph.getJobID(),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 85a30c8..b3f7004 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -31,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
@@ -55,10 +57,10 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.File;
-import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
@@ -80,14 +82,13 @@ import static org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATE
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpoint;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpointMaybe;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -149,19 +150,16 @@ public class ChangelogRescalingITCase extends TestLogger {
@Test
public void test() throws Exception {
// before rescale
- File cpDir1 = temporaryFolder.newFolder();
- JobID jobID1 = submit(configureJob(parallelism1, cpDir1), graph -> {});
+ JobID jobID1 = submit(configureJob(parallelism1, temporaryFolder.newFolder()), graph -> {});
Thread.sleep(ACCUMULATE_TIME_MILLIS);
- File cpLocation = checkpointAndCancel(jobID1, cpDir1);
+ String cpLocation = checkpointAndCancel(jobID1);
// rescale and checkpoint to verify
JobID jobID2 =
submit(
configureJob(parallelism2, temporaryFolder.newFolder()),
- graph ->
- graph.setSavepointRestoreSettings(
- forPath(cpLocation.toURI().toString())));
+ graph -> graph.setSavepointRestoreSettings(forPath(cpLocation)));
waitForAllTaskRunning(cluster.getMiniCluster(), jobID2, true);
cluster.getClusterClient().cancel(jobID2).get();
}
@@ -328,15 +326,15 @@ public class ChangelogRescalingITCase extends TestLogger {
}
}
- private File checkpointAndCancel(JobID jobID, File cpDir)
- throws IOException, InterruptedException, ExecutionException {
- while (!getMostRecentCompletedCheckpointMaybe(cpDir).isPresent()) {
- checkStatus(jobID);
- Thread.sleep(50);
- }
+ private String checkpointAndCancel(JobID jobID) throws Exception {
+ waitForCheckpoint(jobID, cluster.getMiniCluster(), Deadline.fromNow(Duration.ofMinutes(5)));
cluster.getClusterClient().cancel(jobID).get();
checkStatus(jobID);
- return getMostRecentCompletedCheckpoint(cpDir);
+ return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, cluster.getMiniCluster())
+ .<NoSuchElementException>orElseThrow(
+ () -> {
+ throw new NoSuchElementException("No checkpoint was created yet");
+ });
}
private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 8cbf3e3..033f178 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -113,11 +114,23 @@ public class TestUtils {
}
}
+ /**
+ * @deprecated please use {@link
+ * org.apache.flink.runtime.testutils.CommonTestUtils#getLatestCompletedCheckpointPath(JobID,
+ * MiniCluster)} which is less prone to {@link NoSuchFileException} and IO-intensive.
+ */
+ @Deprecated
public static File getMostRecentCompletedCheckpoint(File checkpointDir) throws IOException {
return getMostRecentCompletedCheckpointMaybe(checkpointDir)
.orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
}
+ /**
+ * @deprecated please use {@link
+ * org.apache.flink.runtime.testutils.CommonTestUtils#getLatestCompletedCheckpointPath(JobID,
+ * MiniCluster)} which is less prone to {@link NoSuchFileException} and IO-intensive.
+ */
+ @Deprecated
public static Optional<File> getMostRecentCompletedCheckpointMaybe(File checkpointDir)
throws IOException {
return Files.find(checkpointDir.toPath(), 2, TestUtils::isCompletedCheckpoint)