You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 16:26:19 UTC

[flink] branch release-1.15 updated: [FLINK-26794][tests] Use API to access job checkpoints in IT cases

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 4e87586  [FLINK-26794][tests] Use API to access job checkpoints in IT cases
4e87586 is described below

commit 4e87586dfb0a7c208440d331751719be0be2ccaa
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Mar 28 12:59:42 2022 +0200

    [FLINK-26794][tests] Use API to access job checkpoints in IT cases
    
    Using dedicated API has the following advantages over looking for checkpoints on FS:
    - less prone to NoSuchFileException
    - less brittle because file layout is not a declared API
    - less IO usage
---
 .../flink/runtime/testutils/CommonTestUtils.java   | 35 ++++++++++++++++++++++
 .../test/state/ChangelogCompatibilityITCase.java   | 20 +++++++++----
 .../flink/test/state/ChangelogRescalingITCase.java | 30 +++++++++----------
 .../java/org/apache/flink/test/util/TestUtils.java | 13 ++++++++
 4 files changed, 77 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 7e01ee4..678eb2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.util.FileUtils;
@@ -47,6 +49,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
@@ -359,6 +363,37 @@ public class CommonTestUtils {
                 Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
     }
 
+    /** Wait for at least one successful checkpoint. */
+    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, Deadline timeout)
+            throws Exception, FlinkJobNotFoundException {
+        waitUntilCondition(
+                () ->
+                        Optional.ofNullable(
+                                        miniCluster
+                                                .getExecutionGraph(jobID)
+                                                .get()
+                                                .getCheckpointStatsSnapshot())
+                                .filter(st -> st.getCounts().getNumberOfCompletedCheckpoints() > 0)
+                                .isPresent(),
+                timeout);
+    }
+
+    /**
+     * @return the path as {@link java.net.URI} to the latest checkpoint.
+     * @throws FlinkJobNotFoundException if job not found
+     */
+    public static Optional<String> getLatestCompletedCheckpointPath(
+            JobID jobID, MiniCluster cluster)
+            throws InterruptedException, ExecutionException, FlinkJobNotFoundException {
+        return Optional.ofNullable(
+                        cluster.getExecutionGraph(jobID).get().getCheckpointStatsSnapshot())
+                .flatMap(
+                        stats ->
+                                Optional.ofNullable(
+                                        stats.getHistory().getLatestCompletedCheckpoint()))
+                .map(CompletedCheckpointStats::getExternalPath);
+    }
+
     /** Utility class to read the output of a process stream and forward it into a StringWriter. */
     public static class PipeForwarder extends Thread {
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index 3023e99..ef4052c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -18,11 +18,13 @@
 package org.apache.flink.test.state;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -37,16 +39,18 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 
 import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
 import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
 import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
 import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpointMaybe;
 import static org.apache.flink.util.ExceptionUtils.findThrowableSerializedAware;
 import static org.junit.Assert.fail;
 
@@ -141,12 +145,18 @@ public class ChangelogCompatibilityITCase {
         ClusterClient<?> client = miniClusterResource.getClusterClient();
         submit(jobGraph, client);
         if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
-            while (!getMostRecentCompletedCheckpointMaybe(checkpointDir).isPresent()) {
-                Thread.sleep(50);
-            }
+            waitForCheckpoint(
+                    jobGraph.getJobID(),
+                    miniClusterResource.getMiniCluster(),
+                    Deadline.fromNow(Duration.ofMinutes(5)));
             client.cancel(jobGraph.getJobID()).get();
             // obtain the latest checkpoint *after* cancellation - that one won't be subsumed
-            return pathToString(getMostRecentCompletedCheckpointMaybe(checkpointDir).get());
+            return CommonTestUtils.getLatestCompletedCheckpointPath(
+                            jobGraph.getJobID(), miniClusterResource.getMiniCluster())
+                    .<NoSuchElementException>orElseThrow(
+                            () -> {
+                                throw new NoSuchElementException("No checkpoint was created yet");
+                            });
         } else {
             return client.stopWithSavepoint(
                             jobGraph.getJobID(),
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 85a30c8..b3f7004 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.state;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -31,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
@@ -55,10 +57,10 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
@@ -80,14 +82,13 @@ import static org.apache.flink.configuration.StateChangelogOptions.PERIODIC_MATE
 import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
 import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
 import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT;
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE;
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED;
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpoint;
-import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpointMaybe;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -149,19 +150,16 @@ public class ChangelogRescalingITCase extends TestLogger {
     @Test
     public void test() throws Exception {
         // before rescale
-        File cpDir1 = temporaryFolder.newFolder();
-        JobID jobID1 = submit(configureJob(parallelism1, cpDir1), graph -> {});
+        JobID jobID1 = submit(configureJob(parallelism1, temporaryFolder.newFolder()), graph -> {});
 
         Thread.sleep(ACCUMULATE_TIME_MILLIS);
-        File cpLocation = checkpointAndCancel(jobID1, cpDir1);
+        String cpLocation = checkpointAndCancel(jobID1);
 
         // rescale and checkpoint to verify
         JobID jobID2 =
                 submit(
                         configureJob(parallelism2, temporaryFolder.newFolder()),
-                        graph ->
-                                graph.setSavepointRestoreSettings(
-                                        forPath(cpLocation.toURI().toString())));
+                        graph -> graph.setSavepointRestoreSettings(forPath(cpLocation)));
         waitForAllTaskRunning(cluster.getMiniCluster(), jobID2, true);
         cluster.getClusterClient().cancel(jobID2).get();
     }
@@ -328,15 +326,15 @@ public class ChangelogRescalingITCase extends TestLogger {
         }
     }
 
-    private File checkpointAndCancel(JobID jobID, File cpDir)
-            throws IOException, InterruptedException, ExecutionException {
-        while (!getMostRecentCompletedCheckpointMaybe(cpDir).isPresent()) {
-            checkStatus(jobID);
-            Thread.sleep(50);
-        }
+    private String checkpointAndCancel(JobID jobID) throws Exception {
+        waitForCheckpoint(jobID, cluster.getMiniCluster(), Deadline.fromNow(Duration.ofMinutes(5)));
         cluster.getClusterClient().cancel(jobID).get();
         checkStatus(jobID);
-        return getMostRecentCompletedCheckpoint(cpDir);
+        return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, cluster.getMiniCluster())
+                .<NoSuchElementException>orElseThrow(
+                        () -> {
+                            throw new NoSuchElementException("No checkpoint was created yet");
+                        });
     }
 
     private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 8cbf3e3..033f178 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -113,11 +114,23 @@ public class TestUtils {
         }
     }
 
+    /**
+     * @deprecated please use {@link
+     *     org.apache.flink.runtime.testutils.CommonTestUtils#getLatestCompletedCheckpointPath(JobID,
+     *     MiniCluster)} which is less prone to {@link NoSuchFileException} and IO-intensive.
+     */
+    @Deprecated
     public static File getMostRecentCompletedCheckpoint(File checkpointDir) throws IOException {
         return getMostRecentCompletedCheckpointMaybe(checkpointDir)
                 .orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
     }
 
+    /**
+     * @deprecated please use {@link
+     *     org.apache.flink.runtime.testutils.CommonTestUtils#getLatestCompletedCheckpointPath(JobID,
+     *     MiniCluster)} which is less prone to {@link NoSuchFileException} and IO-intensive.
+     */
+    @Deprecated
     public static Optional<File> getMostRecentCompletedCheckpointMaybe(File checkpointDir)
             throws IOException {
         return Files.find(checkpointDir.toPath(), 2, TestUtils::isCompletedCheckpoint)