You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/17 15:18:39 UTC
[flink] 03/03: [hotfix][hack] Make ChangelogStateBackend randomized tests work with forced full snapshots
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68f2bc3b80e0c29a378fa9f30ffb9cd6d46aba58
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Dec 10 09:47:29 2021 +0100
[hotfix][hack] Make ChangelogStateBackend randomized tests work with forced full snapshots
---
.../flink/runtime/minicluster/MiniCluster.java | 30 ++++++++++++++++++++++
.../streaming/util/TestStreamEnvironment.java | 7 ++++-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d42abb2..2f2c498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.minicluster;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -57,6 +58,9 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -666,6 +670,16 @@ public class MiniCluster implements AutoCloseableAsync {
return miniClusterConfiguration.getConfiguration();
}
+ // HACK: temporary hack to make the randomized changelog state backend tests work with forced
+ // full snapshots. This option should be removed once changelog state backend supports forced
+ // full snapshots
+ @Internal private boolean overrideRestoreModeForRandomizedChangelogStateBackend;
+
+ @Internal
+ public void overrideRestoreModeForRandomizedChangelogStateBackend() {
+ this.overrideRestoreModeForRandomizedChangelogStateBackend = true;
+ }
+
@GuardedBy("lock")
private Collection<? extends CompletableFuture<Void>> terminateTaskManagers() {
final Collection<CompletableFuture<Void>> terminationFutures =
@@ -858,6 +872,7 @@ public class MiniCluster implements AutoCloseableAsync {
}
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
+ checkRestoreModeForRandomizedChangelogStateBackend(jobGraph);
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
getDispatcherGatewayFuture();
final CompletableFuture<InetSocketAddress> blobServerAddressFuture =
@@ -875,6 +890,21 @@ public class MiniCluster implements AutoCloseableAsync {
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
+ // HACK: temporary hack to make the randomized changelog state backend tests work with forced
+ // full snapshots. This option should be removed once changelog state backend supports forced
+ // full snapshots
+ private void checkRestoreModeForRandomizedChangelogStateBackend(JobGraph jobGraph) {
+ final SavepointRestoreSettings savepointRestoreSettings =
+ jobGraph.getSavepointRestoreSettings();
+ if (overrideRestoreModeForRandomizedChangelogStateBackend
+ && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) {
+ final Configuration conf = new Configuration();
+ SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf);
+ conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY);
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf));
+ }
+ }
+
public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
return runDispatcherCommand(
dispatcherGateway ->
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 00f5692..4bf603c 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -124,8 +124,13 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
+ miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
- randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+ boolean enabled =
+ randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+ if (enabled) {
+ miniCluster.overrideRestoreModeForRandomizedChangelogStateBackend();
+ }
}
}
}