You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/14 11:00:05 UTC

[flink] 02/02: [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 746acb487be3e4fb8f94505aa9b79cb60b4ce851
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Feb 11 14:22:51 2022 +0100

    [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend
---
 .../test/checkpointing/SavepointFormatITCase.java  | 276 +++++++++++----------
 1 file changed, 148 insertions(+), 128 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index 7757354..580ff38 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -59,11 +61,13 @@ import org.slf4j.event.Level;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
 import java.util.function.Predicate;
-import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,138 +82,151 @@ public class SavepointFormatITCase {
     LoggerAuditingExtension loggerAuditingExtension =
             new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);
 
-    private static Stream<Arguments> parameters() {
-        return Stream.of(
-                Arguments.of(
-                        SavepointFormatType.CANONICAL,
-                        HEAP,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(SavepointKeyedStateHandle.class))),
-                Arguments.of(
-                        SavepointFormatType.NATIVE,
-                        HEAP,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(KeyGroupsStateHandle.class))),
-                Arguments.of(
-                        SavepointFormatType.CANONICAL,
-                        ROCKSDB_FULL_SNAPSHOTS,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(SavepointKeyedStateHandle.class))),
-                Arguments.of(
-                        SavepointFormatType.NATIVE,
-                        ROCKSDB_FULL_SNAPSHOTS,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(KeyGroupsStateHandle.class))),
-                Arguments.of(
-                        SavepointFormatType.CANONICAL,
-                        ROCKSDB_INCREMENTAL_SNAPSHOTS,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(SavepointKeyedStateHandle.class))),
-                Arguments.of(
-                        SavepointFormatType.NATIVE,
-                        ROCKSDB_INCREMENTAL_SNAPSHOTS,
-                        (Consumer<KeyedStateHandle>)
-                                keyedState ->
-                                        assertThat(
-                                                keyedState,
-                                                instanceOf(
-                                                        IncrementalRemoteKeyedStateHandle.class))));
+    private static List<Arguments> parameters() {
+        // iterate through all combinations of backends, isIncremental, isChangelogEnabled
+        List<Arguments> result = new LinkedList<>();
+        for (BiFunction<Boolean, Boolean, StateBackendConfig> builder :
+                StateBackendConfig.builders) {
+            for (boolean incremental : new boolean[] {true, false}) {
+                for (boolean changelog : new boolean[] {true, false}) {
+                    for (SavepointFormatType formatType : SavepointFormatType.values()) {
+                        result.add(Arguments.of(formatType, builder.apply(incremental, changelog)));
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    private void validateState(
+            KeyedStateHandle state,
+            SavepointFormatType formatType,
+            StateBackendConfig backendConfig) {
+        if (formatType == SavepointFormatType.CANONICAL) {
+            assertThat(state, instanceOf(SavepointKeyedStateHandle.class));
+        } else if (backendConfig.isChangelogEnabled()) {
+            assertThat(state, instanceOf(ChangelogStateBackendHandle.class));
+            for (KeyedStateHandle nestedState :
+                    ((ChangelogStateBackendHandle) state).getMaterializedStateHandles()) {
+                validateNativeNonChangelogState(nestedState, backendConfig);
+            }
+        } else {
+            validateNativeNonChangelogState(state, backendConfig);
+        }
+    }
+
+    private void validateNativeNonChangelogState(
+            KeyedStateHandle state, StateBackendConfig backendConfig) {
+        if (backendConfig.isIncremental()) {
+            assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class));
+        } else {
+            assertThat(state, instanceOf(KeyGroupsStateHandle.class));
+        }
     }
 
     private abstract static class StateBackendConfig {
+        protected final boolean changelogEnabled;
+        protected final boolean incremental;
+
+        protected StateBackendConfig(boolean changelogEnabled, boolean incremental) {
+            this.changelogEnabled = changelogEnabled;
+            this.incremental = incremental;
+        }
+
         public abstract String getName();
 
-        public abstract Configuration getConfiguration();
+        public Configuration getConfiguration() {
+            Configuration stateBackendConfig = new Configuration();
+            stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, getConfigName());
+            stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+            stateBackendConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, changelogEnabled);
+            return stateBackendConfig;
+        }
 
         public int getCheckpointsBeforeSavepoint() {
             return 0;
         }
 
+        protected abstract String getConfigName();
+
         @Override
         public final String toString() {
-            return getName();
+            return String.format(
+                    "%s, incremental: %b, changelog: %b", getName(), incremental, changelogEnabled);
         }
-    }
 
-    private static final StateBackendConfig HEAP =
-            new StateBackendConfig() {
-                @Override
-                public String getName() {
-                    return "HEAP";
-                }
-
-                @Override
-                public Configuration getConfiguration() {
-                    Configuration stateBackendConfig = new Configuration();
-                    stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
-                    stateBackendConfig.set(
-                            CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
-                    return stateBackendConfig;
-                }
-            };
-
-    private static final StateBackendConfig ROCKSDB_FULL_SNAPSHOTS =
-            new StateBackendConfig() {
-                @Override
-                public String getName() {
-                    return "ROCKSDB_FULL_SNAPSHOTS";
-                }
+        private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders =
+                asList(SavepointFormatITCase::getRocksdb, SavepointFormatITCase::heap);
 
-                @Override
-                public Configuration getConfiguration() {
-                    Configuration stateBackendConfig = new Configuration();
-                    stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
-                    stateBackendConfig.set(
-                            CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
-                    stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false);
-                    return stateBackendConfig;
-                }
-            };
+        public abstract boolean isIncremental();
 
-    private static final StateBackendConfig ROCKSDB_INCREMENTAL_SNAPSHOTS =
-            new StateBackendConfig() {
-                @Override
-                public String getName() {
-                    return "ROCKSDB_INCREMENTAL_SNAPSHOTS";
-                }
+        private boolean isChangelogEnabled() {
+            return changelogEnabled;
+        }
+    }
 
-                @Override
-                public int getCheckpointsBeforeSavepoint() {
-                    return 1;
-                }
+    private static StateBackendConfig heap(boolean incremental, boolean changelogEnabled) {
+        return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) {
+            @Override
+            public String getName() {
+                return "HEAP";
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                Configuration stateBackendConfig = super.getConfiguration();
+                stateBackendConfig.set(
+                        CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
+                return stateBackendConfig;
+            }
+
+            @Override
+            protected String getConfigName() {
+                return "filesystem";
+            }
+
+            @Override
+            public boolean isIncremental() {
+                return false;
+            }
+        };
+    }
 
-                @Override
-                public Configuration getConfiguration() {
-                    Configuration stateBackendConfig = new Configuration();
-                    stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
-                    stateBackendConfig.set(
-                            CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
-                    stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
-                    return stateBackendConfig;
-                }
-            };
+    private static StateBackendConfig getRocksdb(boolean incremental, boolean changelogEnabled) {
+        return new StateBackendConfig(changelogEnabled, incremental) {
+            @Override
+            public String getName() {
+                return "ROCKSDB";
+            }
+
+            @Override
+            public int getCheckpointsBeforeSavepoint() {
+                return 1;
+            }
+
+            @Override
+            public boolean isIncremental() {
+                return this.incremental;
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                Configuration stateBackendConfig = super.getConfiguration();
+                stateBackendConfig.set(
+                        CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
+                return stateBackendConfig;
+            }
+
+            protected String getConfigName() {
+                return "rocksdb";
+            }
+        };
+    }
 
     @ParameterizedTest(name = "[{index}] {0}, {1}")
     @MethodSource("parameters")
     public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(
-            SavepointFormatType formatType,
-            StateBackendConfig stateBackendConfig,
-            Consumer<KeyedStateHandle> stateHandleVerification)
+            SavepointFormatType formatType, StateBackendConfig stateBackendConfig)
             throws Exception {
         final int numTaskManagers = 2;
         final int numSlotsPerTaskManager = 2;
@@ -231,20 +248,16 @@ public class SavepointFormatITCase {
                     submitJobAndTakeSavepoint(
                             miniClusterResource,
                             formatType,
-                            stateBackendConfig.getCheckpointsBeforeSavepoint());
+                            stateBackendConfig.getCheckpointsBeforeSavepoint(),
+                            config);
             final CheckpointMetadata metadata = loadCheckpointMetadata(savepointPath);
 
             final OperatorState operatorState =
                     metadata.getOperatorStates().stream().filter(hasKeyedState()).findFirst().get();
-            operatorState
-                    .getStates()
-                    .forEach(
-                            subtaskState -> {
-                                subtaskState
-                                        .getManagedKeyedState()
-                                        .forEach(stateHandleVerification);
-                            });
-            relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir);
+            operatorState.getStates().stream()
+                    .flatMap(subtaskState -> subtaskState.getManagedKeyedState().stream())
+                    .forEach(handle -> validateState(handle, formatType, stateBackendConfig));
+            relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir, config);
         } finally {
             miniClusterResource.after();
         }
@@ -272,14 +285,17 @@ public class SavepointFormatITCase {
     }
 
     private void relocateAndVerify(
-            MiniClusterWithClientResource cluster, String savepointPath, Path renamedSavepointDir)
+            MiniClusterWithClientResource cluster,
+            String savepointPath,
+            Path renamedSavepointDir,
+            Configuration config)
             throws Exception {
         final org.apache.flink.core.fs.Path oldPath =
                 new org.apache.flink.core.fs.Path(savepointPath);
         final org.apache.flink.core.fs.Path newPath =
                 new org.apache.flink.core.fs.Path(renamedSavepointDir.toUri().toString());
         (new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath);
-        final JobGraph jobGraph = createJobGraph();
+        final JobGraph jobGraph = createJobGraph(config);
         jobGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(
                         renamedSavepointDir.toUri().toString(), false, RestoreMode.CLAIM));
@@ -293,9 +309,10 @@ public class SavepointFormatITCase {
     private String submitJobAndTakeSavepoint(
             MiniClusterWithClientResource cluster,
             SavepointFormatType formatType,
-            int checkpointBeforeSavepoint)
+            int checkpointBeforeSavepoint,
+            Configuration config)
             throws Exception {
-        final JobGraph jobGraph = createJobGraph();
+        final JobGraph jobGraph = createJobGraph(config);
 
         final JobID jobId = jobGraph.getJobID();
         ClusterClient<?> client = cluster.getClusterClient();
@@ -311,8 +328,11 @@ public class SavepointFormatITCase {
                 .get();
     }
 
-    private static JobGraph createJobGraph() {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    private static JobGraph createJobGraph(Configuration config) {
+        StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(
+                        /* pass configuration to prevent any conflicting randomization*/
+                        config);
         env.setParallelism(4);
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         env.disableOperatorChaining();