You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/16 15:49:45 UTC

[flink] branch master updated (20e03f4 -> 6fc4bad)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 20e03f4  [FLINK-26177][tests] Disable PulsarSourceITCase rescaling tests temporarily
     new 369088f  [hotfix][tests] Explicitly disable changelog in migration tests
     new 6fc4bad  [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/ops/state/state_backends.md   |   4 +-
 docs/content/docs/ops/state/state_backends.md      |   4 +-
 .../state/changelog/ChangelogStateBackend.java     |  32 ++-
 .../LegacyStatefulJobSavepointMigrationITCase.java |   1 +
 .../utils/StatefulJobSavepointMigrationITCase.java |   1 +
 .../StatefulJobWBroadcastStateMigrationITCase.java |   1 +
 .../TypeSerializerSnapshotMigrationITCase.java     |   1 +
 .../test/state/ChangelogCompatibilityITCase.java   | 282 +++++++++++++++++++++
 .../restore/AbstractOperatorRestoreTestBase.java   |   1 +
 .../java/org/apache/flink/test/util/TestUtils.java |   9 +-
 .../StatefulJobSavepointMigrationITCase.scala      |   1 +
 ...StatefulJobWBroadcastStateMigrationITCase.scala |   2 +
 12 files changed, 323 insertions(+), 16 deletions(-)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java

[flink] 02/02: [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fc4bad2c6443ef33ac86a286f815ecc9afba31c
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Feb 11 16:12:29 2022 +0100

    [FLINK-26079][state/changelog] Disallow recovery from non-changelog checkpoints
    
    Private state of non-changelog checkpoints is not registered
    with the SharedStateRegistry on recovery.
    Therefore, after recovering in CLAIM mode it will be discarded
    as soon as the initial checkpoint is subsumed.
    
    This change disallows recovery by checking handle types during
    Changelog backend creation.
---
 docs/content.zh/docs/ops/state/state_backends.md   |   4 +-
 docs/content/docs/ops/state/state_backends.md      |   4 +-
 .../state/changelog/ChangelogStateBackend.java     |  32 ++-
 .../test/state/ChangelogCompatibilityITCase.java   | 282 +++++++++++++++++++++
 .../java/org/apache/flink/test/util/TestUtils.java |   9 +-
 .../StatefulJobSavepointMigrationITCase.scala      |   1 +
 6 files changed, 316 insertions(+), 16 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index 38575f6..e6d6764 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -405,9 +405,9 @@ If a task is backpressured by writing state changes, it will be shown as busy (r
 
 **Enabling Changelog**
 
-Resuming from both savepoints and checkpoints is supported:
+Resuming only from savepoints in canonical format is supported:
 - given an existing non-changelog job
-- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+- take a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (canonical format is the default)
 - alter configuration (enable Changelog)
 - resume from the taken snapshot
 
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index 8690e3d..aa06c909 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -427,9 +427,9 @@ If a task is backpressured by writing state changes, it will be shown as busy (r
 
 **Enabling Changelog**
 
-Resuming from both savepoints and checkpoints is supported:
+Resuming only from savepoints in canonical format is supported:
 - given an existing non-changelog job
-- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+- take a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) (canonical format is the default)
 - alter configuration (enable Changelog)
 - resume from the taken snapshot
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index 34c1ed3..a1a2016 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
@@ -267,19 +269,29 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab
         }
         return stateHandles.stream()
                 .filter(Objects::nonNull)
-                .map(
-                        keyedStateHandle ->
-                                keyedStateHandle instanceof ChangelogStateBackendHandle
-                                        ? (ChangelogStateBackendHandle) keyedStateHandle
-                                        : new ChangelogStateBackendHandleImpl(
-                                                singletonList(keyedStateHandle),
-                                                emptyList(),
-                                                keyedStateHandle.getKeyGroupRange(),
-                                                getMaterializationID(keyedStateHandle),
-                                                0L))
+                .map(this::getChangelogStateBackendHandle)
                 .collect(Collectors.toList());
     }
 
+    private ChangelogStateBackendHandle getChangelogStateBackendHandle(
+            KeyedStateHandle keyedStateHandle) {
+        if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+            return (ChangelogStateBackendHandle) keyedStateHandle;
+        } else if (keyedStateHandle instanceof SavepointKeyedStateHandle) {
+            return new ChangelogStateBackendHandleImpl(
+                    singletonList(keyedStateHandle),
+                    emptyList(),
+                    keyedStateHandle.getKeyGroupRange(),
+                    getMaterializationID(keyedStateHandle),
+                    0L);
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Recovery not supported from %s with Changelog enabled. Consider taking a savepoint in %s format.",
+                            keyedStateHandle.getClass(), SavepointFormatType.CANONICAL));
+        }
+    }
+
     private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
         if (keyedStateHandle instanceof CheckpointBoundKeyedStateHandle) {
             return ((CheckpointBoundKeyedStateHandle) keyedStateHandle).getCheckpointId();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
new file mode 100644
index 0000000..e6228a2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
+import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+import static org.apache.flink.test.util.TestUtils.getMostRecentCompletedCheckpointMaybe;
+import static org.apache.flink.util.ExceptionUtils.findThrowableSerializedAware;
+import static org.junit.Assert.fail;
+
+/**
+ * A test suite to check that restrictions on recovery with changelog enabled are enforced; and that
+ * non-restricted scenarios are not blocked. In particular, recovery from non-changelog checkpoints
+ * should not be allowed (see <a
+ * href="https://issues.apache.org/jira/browse/FLINK-26079">FLINK-26079</a>).
+ */
+@RunWith(Parameterized.class)
+public class ChangelogCompatibilityITCase {
+
+    private final TestCase testCase;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static List<TestCase> parameters() {
+        return Arrays.asList(
+                // disable changelog - allow restore only from CANONICAL_SAVEPOINT
+                TestCase.startWithChangelog(true)
+                        .restoreWithChangelog(false)
+                        .from(RestoreSource.CANONICAL_SAVEPOINT)
+                        .allowRestore(true),
+                // enable changelog - allow restore only from CANONICAL_SAVEPOINT
+                TestCase.startWithChangelog(false)
+                        .restoreWithChangelog(true)
+                        .from(RestoreSource.CANONICAL_SAVEPOINT)
+                        .allowRestore(true),
+                // explicitly disallow recovery from  non-changelog checkpoints
+                // https://issues.apache.org/jira/browse/FLINK-26079
+                TestCase.startWithChangelog(false)
+                        .restoreWithChangelog(true)
+                        .from(RestoreSource.CHECKPOINT)
+                        .allowRestore(false),
+                // normal cases: changelog enabled before and after recovery
+                TestCase.startWithChangelog(true)
+                        .restoreWithChangelog(true)
+                        .from(RestoreSource.CANONICAL_SAVEPOINT)
+                        .allowRestore(true),
+                TestCase.startWithChangelog(true)
+                        .restoreWithChangelog(true)
+                        .from(RestoreSource.NATIVE_SAVEPOINT)
+                        .allowRestore(true),
+                TestCase.startWithChangelog(true)
+                        .restoreWithChangelog(true)
+                        .from(RestoreSource.CHECKPOINT)
+                        .allowRestore(true));
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        JobGraph initialGraph = addGraph(initEnvironment());
+        String restoreSourceLocation = checkpointAndStop(initialGraph);
+
+        restoreAndValidate(restoreSourceLocation);
+    }
+
+    private StreamExecutionEnvironment initEnvironment() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableChangelogStateBackend(testCase.startWithChangelog);
+        if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
+            env.enableCheckpointing(50);
+            env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
+        }
+        return env;
+    }
+
+    private JobGraph addGraph(StreamExecutionEnvironment env) {
+        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
+                .countWindowAll(37) // any stateful transformation suffices
+                .reduce((ReduceFunction<Long>) Long::sum) // overflow is fine, result is discarded
+                .addSink(new DiscardingSink<>());
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private String checkpointAndStop(JobGraph jobGraph) throws Exception {
+        ClusterClient<?> client = miniClusterResource.getClusterClient();
+        submit(jobGraph, client);
+        if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
+            String location = pathToString(waitForCheckpoint());
+            client.cancel(jobGraph.getJobID()).get();
+            return location;
+        } else {
+            return client.stopWithSavepoint(
+                            jobGraph.getJobID(),
+                            false,
+                            pathToString(savepointDir),
+                            testCase.restoreSource == RestoreSource.CANONICAL_SAVEPOINT
+                                    ? SavepointFormatType.CANONICAL
+                                    : SavepointFormatType.NATIVE)
+                    .get();
+        }
+    }
+
+    private void restoreAndValidate(String location) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableChangelogStateBackend(testCase.restoreWithChangelog);
+        JobGraph jobGraph = addGraph(env);
+        jobGraph.setSavepointRestoreSettings(forPath(location));
+
+        boolean restored = tryRun(jobGraph);
+
+        if (restored && !testCase.allowRestore) {
+            fail(
+                    String.format(
+                            "restoring from %s taken with changelog.enabled=%b should NOT be allowed with changelog.enabled=%b",
+                            testCase.restoreSource,
+                            testCase.startWithChangelog,
+                            testCase.restoreWithChangelog));
+        } else if (!restored && testCase.allowRestore) {
+            fail(
+                    String.format(
+                            "restoring from %s taken with changelog.enabled=%b should be allowed with changelog.enabled=%b",
+                            testCase.restoreSource,
+                            testCase.startWithChangelog,
+                            testCase.restoreWithChangelog));
+        }
+    }
+
+    private boolean tryRun(JobGraph jobGraph) throws Exception {
+        try {
+            submit(jobGraph, miniClusterResource.getClusterClient());
+            miniClusterResource.getClusterClient().cancel(jobGraph.getJobID()).get();
+            return true;
+        } catch (AssertionError | Exception e) { // AssertionError is thrown by CommonTestUtils
+            if (isValidationError(e)) {
+                return false;
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private boolean isValidationError(Throwable e) {
+        return findThrowableSerializedAware(
+                        e, IllegalStateException.class, this.getClass().getClassLoader())
+                .filter(i -> i.getMessage().toLowerCase().contains("recovery not supported"))
+                .isPresent();
+    }
+
+    private static final class TestCase {
+        boolean startWithChangelog;
+        boolean restoreWithChangelog;
+        RestoreSource restoreSource;
+        boolean allowRestore;
+
+        public static TestCase startWithChangelog(boolean changelogEnabled) {
+            TestCase testCase = new TestCase();
+            testCase.startWithChangelog = changelogEnabled;
+            return testCase;
+        }
+
+        public TestCase restoreWithChangelog(boolean restoreWithChangelog) {
+            this.restoreWithChangelog = restoreWithChangelog;
+            return this;
+        }
+
+        public TestCase from(RestoreSource restoreSource) {
+            this.restoreSource = restoreSource;
+            return this;
+        }
+
+        public TestCase allowRestore(boolean allowRestore) {
+            this.allowRestore = allowRestore;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "startWithChangelog=%s, restoreWithChangelog=%s, restoreFrom=%s, allowRestore=%s",
+                    startWithChangelog, restoreWithChangelog, restoreSource, allowRestore);
+        }
+    }
+
+    private enum RestoreSource {
+        CANONICAL_SAVEPOINT,
+        NATIVE_SAVEPOINT,
+        CHECKPOINT
+    }
+
+    private void submit(JobGraph jobGraph, ClusterClient<?> client) throws Exception {
+        client.submitJob(jobGraph).get();
+        waitForAllTaskRunning(miniClusterResource.getMiniCluster(), jobGraph.getJobID(), true);
+    }
+
+    private File waitForCheckpoint() throws IOException, InterruptedException {
+        Optional<File> location = getMostRecentCompletedCheckpointMaybe(checkpointDir);
+        while (!location.isPresent()) {
+            Thread.sleep(50);
+            location = getMostRecentCompletedCheckpointMaybe(checkpointDir);
+        }
+        return location.get();
+    }
+
+    private static String pathToString(File path) {
+        return path.toURI().toString();
+    }
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    private File checkpointDir;
+    private File savepointDir;
+    private MiniClusterWithClientResource miniClusterResource;
+
+    public ChangelogCompatibilityITCase(TestCase testCase) {
+        this.testCase = testCase;
+    }
+
+    @Before
+    public void before() throws Exception {
+        checkpointDir = TEMPORARY_FOLDER.newFolder();
+        savepointDir = TEMPORARY_FOLDER.newFolder();
+        Configuration config = new Configuration();
+        config.setString(CHECKPOINTS_DIRECTORY, pathToString(checkpointDir));
+        config.setString(SAVEPOINT_DIRECTORY, pathToString(savepointDir));
+        FsStateChangelogStorageFactory.configure(config, TEMPORARY_FOLDER.newFolder());
+        miniClusterResource =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(11)
+                                .setNumberSlotsPerTaskManager(1)
+                                .build());
+        miniClusterResource.before();
+    }
+
+    @After
+    public void after() {
+        if (miniClusterResource != null) {
+            miniClusterResource.after();
+        }
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 4137e0f..d7799ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -95,10 +95,15 @@ public class TestUtils {
     }
 
     public static File getMostRecentCompletedCheckpoint(File checkpointDir) throws IOException {
+        return getMostRecentCompletedCheckpointMaybe(checkpointDir)
+                .orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
+    }
+
+    public static Optional<File> getMostRecentCompletedCheckpointMaybe(File checkpointDir)
+            throws IOException {
         return Files.find(checkpointDir.toPath(), 2, TestUtils::isCompletedCheckpoint)
                 .max(Comparator.comparing(Path::toString))
-                .map(Path::toFile)
-                .orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
+                .map(Path::toFile);
     }
 
     private static boolean isCompletedCheckpoint(Path path, BasicFileAttributes attr) {
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index c458911..c68abf3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -153,6 +153,7 @@ class StatefulJobSavepointMigrationITCase(
         env.setStateBackend(new HashMapStateBackend())
       case _ => throw new UnsupportedOperationException
     }
+    env.enableChangelogStateBackend(false);
 
     env.setStateBackend(new MemoryStateBackend)
     env.enableCheckpointing(500)

[flink] 01/02: [hotfix][tests] Explicitly disable changelog in migration tests

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 369088f0f94ae7732f8a65b6676f2187c871328a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Feb 15 22:45:13 2022 +0100

    [hotfix][tests] Explicitly disable changelog in migration tests
---
 .../checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java  | 1 +
 .../test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java   | 1 +
 .../checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java  | 1 +
 .../flink/test/migration/TypeSerializerSnapshotMigrationITCase.java     | 1 +
 .../test/state/operator/restore/AbstractOperatorRestoreTestBase.java    | 1 +
 .../api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala | 2 ++
 6 files changed, 7 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
index 6ac2d98..e59be25 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
@@ -168,6 +168,7 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
             default:
                 throw new UnsupportedOperationException();
         }
+        env.enableChangelogStateBackend(false);
 
         env.enableCheckpointing(500);
         env.setParallelism(4);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
index 007be6a..260c1f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
@@ -133,6 +133,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         env.enableCheckpointing(500);
         env.setParallelism(parallelism);
         env.setMaxParallelism(parallelism);
+        env.enableChangelogStateBackend(false);
 
         SourceFunction<Tuple2<Long, Long>> nonParallelSource;
         SourceFunction<Tuple2<Long, Long>> parallelSource;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
index 35c7257..4470b85 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
@@ -113,6 +113,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
             default:
                 throw new UnsupportedOperationException();
         }
+        env.enableChangelogStateBackend(false);
 
         env.enableCheckpointing(500);
         env.setParallelism(parallelism);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index c9e59db..aa22407 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -130,6 +130,7 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes
             default:
                 throw new UnsupportedOperationException();
         }
+        env.enableChangelogStateBackend(false);
 
         env.enableCheckpointing(500);
         env.setParallelism(parallelism);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index f5bbff4..0ddf11e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -214,6 +214,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
         env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.setStateBackend((StateBackend) new MemoryStateBackend());
+        env.enableChangelogStateBackend(false);
 
         switch (mode) {
             case MIGRATE:
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 7212a6a..78e646e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -110,6 +110,7 @@ class StatefulJobWBroadcastStateMigrationITCase(
         env.setStateBackend(new HashMapStateBackend())
       case _ => throw new UnsupportedOperationException
     }
+    env.enableChangelogStateBackend(false)
 
     lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
       "broadcast-state-1",
@@ -179,6 +180,7 @@ class StatefulJobWBroadcastStateMigrationITCase(
         env.setStateBackend(new HashMapStateBackend())
       case _ => throw new UnsupportedOperationException
     }
+    env.enableChangelogStateBackend(false)
 
     lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
       "broadcast-state-1",