You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/17 15:04:36 UTC
[flink] 02/02: [FLINK-26165][tests] Don't test NATIVE savepoints with Changelog enabled
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10abe7cbf3b262527eaae3e1cc0ee3669ead1763
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 15 18:27:18 2022 +0100
[FLINK-26165][tests] Don't test NATIVE savepoints with Changelog enabled
---
.../test/checkpointing/SavepointFormatITCase.java | 6 +-
.../test/state/ChangelogCompatibilityITCase.java | 81 ++++++++++++++--------
2 files changed, 59 insertions(+), 28 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index 580ff38..3d3739e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -83,13 +83,17 @@ public class SavepointFormatITCase {
new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);
private static List<Arguments> parameters() {
- // iterate through all combinations of backends, isIncremental, isChangelogEnabled
+ // iterate through all valid combinations of backends, isIncremental, isChangelogEnabled
List<Arguments> result = new LinkedList<>();
for (BiFunction<Boolean, Boolean, StateBackendConfig> builder :
StateBackendConfig.builders) {
for (boolean incremental : new boolean[] {true, false}) {
for (boolean changelog : new boolean[] {true, false}) {
for (SavepointFormatType formatType : SavepointFormatType.values()) {
+ if (changelog && formatType == SavepointFormatType.NATIVE) {
+ // not supported
+ continue;
+ }
result.add(Arguments.of(formatType, builder.apply(incremental, changelog)));
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index e6228a2..43e4c97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -86,10 +86,12 @@ public class ChangelogCompatibilityITCase {
.restoreWithChangelog(true)
.from(RestoreSource.CANONICAL_SAVEPOINT)
.allowRestore(true),
+ // taking native savepoints is not supported with changelog
TestCase.startWithChangelog(true)
.restoreWithChangelog(true)
.from(RestoreSource.NATIVE_SAVEPOINT)
- .allowRestore(true),
+ .allowSave(false)
+ .allowRestore(false),
TestCase.startWithChangelog(true)
.restoreWithChangelog(true)
.from(RestoreSource.CHECKPOINT)
@@ -98,10 +100,24 @@ public class ChangelogCompatibilityITCase {
@Test
public void testRestore() throws Exception {
- JobGraph initialGraph = addGraph(initEnvironment());
- String restoreSourceLocation = checkpointAndStop(initialGraph);
+ runAndStoreIfAllowed().ifPresent(this::restoreAndValidate);
+ }
- restoreAndValidate(restoreSourceLocation);
+ private Optional<String> runAndStoreIfAllowed() throws Exception {
+ JobGraph initialGraph = addGraph(initEnvironment());
+ try {
+ String location = tryCheckpointAndStop(initialGraph);
+ if (!testCase.allowStore) {
+ fail(testCase.describeStore());
+ }
+ return Optional.of(location);
+ } catch (Exception e) {
+ if (testCase.allowStore) {
+ throw e;
+ } else {
+ return Optional.empty();
+ }
+ }
}
private StreamExecutionEnvironment initEnvironment() {
@@ -122,7 +138,7 @@ public class ChangelogCompatibilityITCase {
return env.getStreamGraph().getJobGraph();
}
- private String checkpointAndStop(JobGraph jobGraph) throws Exception {
+ private String tryCheckpointAndStop(JobGraph jobGraph) throws Exception {
ClusterClient<?> client = miniClusterResource.getClusterClient();
submit(jobGraph, client);
if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
@@ -141,32 +157,18 @@ public class ChangelogCompatibilityITCase {
}
}
- private void restoreAndValidate(String location) throws Exception {
+ private void restoreAndValidate(String location) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableChangelogStateBackend(testCase.restoreWithChangelog);
JobGraph jobGraph = addGraph(env);
jobGraph.setSavepointRestoreSettings(forPath(location));
- boolean restored = tryRun(jobGraph);
-
- if (restored && !testCase.allowRestore) {
- fail(
- String.format(
- "restoring from %s taken with changelog.enabled=%b should NOT be allowed with changelog.enabled=%b",
- testCase.restoreSource,
- testCase.startWithChangelog,
- testCase.restoreWithChangelog));
- } else if (!restored && testCase.allowRestore) {
- fail(
- String.format(
- "restoring from %s taken with changelog.enabled=%b should be allowed with changelog.enabled=%b",
- testCase.restoreSource,
- testCase.startWithChangelog,
- testCase.restoreWithChangelog));
+ if (tryRun(jobGraph) != testCase.allowRestore) {
+ fail(testCase.describeRestore());
}
}
- private boolean tryRun(JobGraph jobGraph) throws Exception {
+ private boolean tryRun(JobGraph jobGraph) {
try {
submit(jobGraph, miniClusterResource.getClusterClient());
miniClusterResource.getClusterClient().cancel(jobGraph.getJobID()).get();
@@ -175,7 +177,7 @@ public class ChangelogCompatibilityITCase {
if (isValidationError(e)) {
return false;
} else {
- throw e;
+ throw new RuntimeException(e);
}
}
}
@@ -191,7 +193,8 @@ public class ChangelogCompatibilityITCase {
boolean startWithChangelog;
boolean restoreWithChangelog;
RestoreSource restoreSource;
- boolean allowRestore;
+ boolean allowStore = true;
+ boolean allowRestore = false;
public static TestCase startWithChangelog(boolean changelogEnabled) {
TestCase testCase = new TestCase();
@@ -214,11 +217,35 @@ public class ChangelogCompatibilityITCase {
return this;
}
+ public TestCase allowSave(boolean allowSave) {
+ this.allowStore = allowSave;
+ return this;
+ }
+
@Override
public String toString() {
return String.format(
- "startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowRestore=%s",
- startWithChangelog, restoreWithChangelog, restoreSource, allowRestore);
+ "startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowStore=%s, allowRestore=%s",
+ startWithChangelog,
+ restoreWithChangelog,
+ restoreSource,
+ allowStore,
+ allowRestore);
+ }
+
+ private String describeStore() {
+ return String.format(
+ "taking %s with changelog.enabled=%b should be %s",
+ restoreSource, startWithChangelog, allowStore ? "allowed" : "disallowed");
+ }
+
+ private String describeRestore() {
+ return String.format(
+ "restoring from %s taken with changelog.enabled=%b should be %s with changelog.enabled=%b",
+ restoreSource,
+ allowRestore ? "allowed" : "disallowed",
+ startWithChangelog,
+ restoreWithChangelog);
}
}