You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/17 15:18:39 UTC

[flink] 03/03: [hotfix][hack] Make ChangelogStateBackend randomized tests work with forced full snapshots

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68f2bc3b80e0c29a378fa9f30ffb9cd6d46aba58
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Dec 10 09:47:29 2021 +0100

    [hotfix][hack] Make ChangelogStateBackend randomized tests work with forced full snapshots
---
 .../flink/runtime/minicluster/MiniCluster.java     | 30 ++++++++++++++++++++++
 .../streaming/util/TestStreamEnvironment.java      |  7 ++++-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d42abb2..2f2c498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.minicluster;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -57,6 +58,9 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -666,6 +670,16 @@ public class MiniCluster implements AutoCloseableAsync {
         return miniClusterConfiguration.getConfiguration();
     }
 
+    // HACK: temporary hack to make the randomized changelog state backend tests work with forced
+    // full snapshots. This option should be removed once changelog state backend supports forced
+    // full snapshots
+    @Internal private boolean overrideRestoreModeForRandomizedChangelogStateBackend;
+
+    @Internal
+    public void overrideRestoreModeForRandomizedChangelogStateBackend() {
+        this.overrideRestoreModeForRandomizedChangelogStateBackend = true;
+    }
+
     @GuardedBy("lock")
     private Collection<? extends CompletableFuture<Void>> terminateTaskManagers() {
         final Collection<CompletableFuture<Void>> terminationFutures =
@@ -858,6 +872,7 @@ public class MiniCluster implements AutoCloseableAsync {
     }
 
     public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
+        checkRestoreModeForRandomizedChangelogStateBackend(jobGraph);
         final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
                 getDispatcherGatewayFuture();
         final CompletableFuture<InetSocketAddress> blobServerAddressFuture =
@@ -875,6 +890,21 @@ public class MiniCluster implements AutoCloseableAsync {
                 (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
     }
 
+    // HACK: temporary hack to make the randomized changelog state backend tests work with forced
+    // full snapshots. This option should be removed once changelog state backend supports forced
+    // full snapshots
+    private void checkRestoreModeForRandomizedChangelogStateBackend(JobGraph jobGraph) {
+        final SavepointRestoreSettings savepointRestoreSettings =
+                jobGraph.getSavepointRestoreSettings();
+        if (overrideRestoreModeForRandomizedChangelogStateBackend
+                && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) {
+            final Configuration conf = new Configuration();
+            SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf);
+            conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY);
+            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
+        }
+    }
+
     public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
         return runDispatcherCommand(
                 dispatcherGateway ->
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 00f5692..4bf603c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -124,8 +124,13 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
         if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
             if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
                 conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
+                miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
             } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
-                randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+                boolean enabled =
+                        randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+                if (enabled) {
+                    miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
+                }
             }
         }
     }