You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/17 15:04:36 UTC

[flink] 02/02: [FLINK-26165][tests] Don't test NATIVE savepoints with Changelog enabled

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10abe7cbf3b262527eaae3e1cc0ee3669ead1763
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 15 18:27:18 2022 +0100

    [FLINK-26165][tests] Don't test NATIVE savepoints with Changelog enabled
---
 .../test/checkpointing/SavepointFormatITCase.java  |  6 +-
 .../test/state/ChangelogCompatibilityITCase.java   | 81 ++++++++++++++--------
 2 files changed, 59 insertions(+), 28 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index 580ff38..3d3739e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -83,13 +83,17 @@ public class SavepointFormatITCase {
             new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);
 
     private static List<Arguments> parameters() {
-        // iterate through all combinations of backends, isIncremental, isChangelogEnabled
+        // iterate through all valid combinations of backends, isIncremental, isChangelogEnabled
         List<Arguments> result = new LinkedList<>();
         for (BiFunction<Boolean, Boolean, StateBackendConfig> builder :
                 StateBackendConfig.builders) {
             for (boolean incremental : new boolean[] {true, false}) {
                 for (boolean changelog : new boolean[] {true, false}) {
                     for (SavepointFormatType formatType : SavepointFormatType.values()) {
+                        if (changelog && formatType == SavepointFormatType.NATIVE) {
+                            // not supported
+                            continue;
+                        }
                         result.add(Arguments.of(formatType, builder.apply(incremental, changelog)));
                     }
                 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index e6228a2..43e4c97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -86,10 +86,12 @@ public class ChangelogCompatibilityITCase {
                         .restoreWithChangelog(true)
                         .from(RestoreSource.CANONICAL_SAVEPOINT)
                         .allowRestore(true),
+                // taking native savepoints is not supported with changelog
                 TestCase.startWithChangelog(true)
                         .restoreWithChangelog(true)
                         .from(RestoreSource.NATIVE_SAVEPOINT)
-                        .allowRestore(true),
+                        .allowSave(false)
+                        .allowRestore(false),
                 TestCase.startWithChangelog(true)
                         .restoreWithChangelog(true)
                         .from(RestoreSource.CHECKPOINT)
@@ -98,10 +100,24 @@ public class ChangelogCompatibilityITCase {
 
     @Test
     public void testRestore() throws Exception {
-        JobGraph initialGraph = addGraph(initEnvironment());
-        String restoreSourceLocation = checkpointAndStop(initialGraph);
+        runAndStoreIfAllowed().ifPresent(this::restoreAndValidate);
+    }
 
-        restoreAndValidate(restoreSourceLocation);
+    private Optional<String> runAndStoreIfAllowed() throws Exception {
+        JobGraph initialGraph = addGraph(initEnvironment());
+        try {
+            String location = tryCheckpointAndStop(initialGraph);
+            if (!testCase.allowStore) {
+                fail(testCase.describeStore());
+            }
+            return Optional.of(location);
+        } catch (Exception e) {
+            if (testCase.allowStore) {
+                throw e;
+            } else {
+                return Optional.empty();
+            }
+        }
     }
 
     private StreamExecutionEnvironment initEnvironment() {
@@ -122,7 +138,7 @@ public class ChangelogCompatibilityITCase {
         return env.getStreamGraph().getJobGraph();
     }
 
-    private String checkpointAndStop(JobGraph jobGraph) throws Exception {
+    private String tryCheckpointAndStop(JobGraph jobGraph) throws Exception {
         ClusterClient<?> client = miniClusterResource.getClusterClient();
         submit(jobGraph, client);
         if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
@@ -141,32 +157,18 @@ public class ChangelogCompatibilityITCase {
         }
     }
 
-    private void restoreAndValidate(String location) throws Exception {
+    private void restoreAndValidate(String location) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableChangelogStateBackend(testCase.restoreWithChangelog);
         JobGraph jobGraph = addGraph(env);
         jobGraph.setSavepointRestoreSettings(forPath(location));
 
-        boolean restored = tryRun(jobGraph);
-
-        if (restored && !testCase.allowRestore) {
-            fail(
-                    String.format(
-                            "restoring from %s taken with changelog.enabled=%b should NOT be allowed with changelog.enabled=%b",
-                            testCase.restoreSource,
-                            testCase.startWithChangelog,
-                            testCase.restoreWithChangelog));
-        } else if (!restored && testCase.allowRestore) {
-            fail(
-                    String.format(
-                            "restoring from %s taken with changelog.enabled=%b should be allowed with changelog.enabled=%b",
-                            testCase.restoreSource,
-                            testCase.startWithChangelog,
-                            testCase.restoreWithChangelog));
+        if (tryRun(jobGraph) != testCase.allowRestore) {
+            fail(testCase.describeRestore());
         }
     }
 
-    private boolean tryRun(JobGraph jobGraph) throws Exception {
+    private boolean tryRun(JobGraph jobGraph) {
         try {
             submit(jobGraph, miniClusterResource.getClusterClient());
             miniClusterResource.getClusterClient().cancel(jobGraph.getJobID()).get();
@@ -175,7 +177,7 @@ public class ChangelogCompatibilityITCase {
             if (isValidationError(e)) {
                 return false;
             } else {
-                throw e;
+                throw new RuntimeException(e);
             }
         }
     }
@@ -191,7 +193,8 @@ public class ChangelogCompatibilityITCase {
         boolean startWithChangelog;
         boolean restoreWithChangelog;
         RestoreSource restoreSource;
-        boolean allowRestore;
+        boolean allowStore = true;
+        boolean allowRestore = false;
 
         public static TestCase startWithChangelog(boolean changelogEnabled) {
             TestCase testCase = new TestCase();
@@ -214,11 +217,35 @@ public class ChangelogCompatibilityITCase {
             return this;
         }
 
+        public TestCase allowSave(boolean allowSave) {
+            this.allowStore = allowSave;
+            return this;
+        }
+
         @Override
         public String toString() {
             return String.format(
-                    "startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowRestore=%s",
-                    startWithChangelog, restoreWithChangelog, restoreSource, allowRestore);
+                    "startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowStore=%s, allowRestore=%s",
+                    startWithChangelog,
+                    restoreWithChangelog,
+                    restoreSource,
+                    allowStore,
+                    allowRestore);
+        }
+
+        private String describeStore() {
+            return String.format(
+                    "taking %s with changelog.enabled=%b should be %s",
+                    restoreSource, startWithChangelog, allowStore ? "allowed" : "disallowed");
+        }
+
+        private String describeRestore() {
+            return String.format(
+                    "restoring from %s taken with changelog.enabled=%b should be %s with changelog.enabled=%b",
+                    restoreSource,
+                    allowRestore ? "allowed" : "disallowed",
+                    startWithChangelog,
+                    restoreWithChangelog);
         }
     }