You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/14 11:00:05 UTC
[flink] 02/02: [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 746acb487be3e4fb8f94505aa9b79cb60b4ce851
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Feb 11 14:22:51 2022 +0100
[FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend
---
.../test/checkpointing/SavepointFormatITCase.java | 276 +++++++++++----------
1 file changed, 148 insertions(+), 128 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index 7757354..580ff38 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -59,11 +61,13 @@ import org.slf4j.event.Level;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
-import java.util.stream.Stream;
+import static java.util.Arrays.asList;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,138 +82,151 @@ public class SavepointFormatITCase {
LoggerAuditingExtension loggerAuditingExtension =
new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);
- private static Stream<Arguments> parameters() {
- return Stream.of(
- Arguments.of(
- SavepointFormatType.CANONICAL,
- HEAP,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(SavepointKeyedStateHandle.class))),
- Arguments.of(
- SavepointFormatType.NATIVE,
- HEAP,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(KeyGroupsStateHandle.class))),
- Arguments.of(
- SavepointFormatType.CANONICAL,
- ROCKSDB_FULL_SNAPSHOTS,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(SavepointKeyedStateHandle.class))),
- Arguments.of(
- SavepointFormatType.NATIVE,
- ROCKSDB_FULL_SNAPSHOTS,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(KeyGroupsStateHandle.class))),
- Arguments.of(
- SavepointFormatType.CANONICAL,
- ROCKSDB_INCREMENTAL_SNAPSHOTS,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(SavepointKeyedStateHandle.class))),
- Arguments.of(
- SavepointFormatType.NATIVE,
- ROCKSDB_INCREMENTAL_SNAPSHOTS,
- (Consumer<KeyedStateHandle>)
- keyedState ->
- assertThat(
- keyedState,
- instanceOf(
- IncrementalRemoteKeyedStateHandle.class))));
+ private static List<Arguments> parameters() {
+ // iterate through all combinations of backends, isIncremental, isChangelogEnabled
+ List<Arguments> result = new LinkedList<>();
+ for (BiFunction<Boolean, Boolean, StateBackendConfig> builder :
+ StateBackendConfig.builders) {
+ for (boolean incremental : new boolean[] {true, false}) {
+ for (boolean changelog : new boolean[] {true, false}) {
+ for (SavepointFormatType formatType : SavepointFormatType.values()) {
+ result.add(Arguments.of(formatType, builder.apply(incremental, changelog)));
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ private void validateState(
+ KeyedStateHandle state,
+ SavepointFormatType formatType,
+ StateBackendConfig backendConfig) {
+ if (formatType == SavepointFormatType.CANONICAL) {
+ assertThat(state, instanceOf(SavepointKeyedStateHandle.class));
+ } else if (backendConfig.isChangelogEnabled()) {
+ assertThat(state, instanceOf(ChangelogStateBackendHandle.class));
+ for (KeyedStateHandle nestedState :
+ ((ChangelogStateBackendHandle) state).getMaterializedStateHandles()) {
+ validateNativeNonChangelogState(nestedState, backendConfig);
+ }
+ } else {
+ validateNativeNonChangelogState(state, backendConfig);
+ }
+ }
+
+ private void validateNativeNonChangelogState(
+ KeyedStateHandle state, StateBackendConfig backendConfig) {
+ if (backendConfig.isIncremental()) {
+ assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class));
+ } else {
+ assertThat(state, instanceOf(KeyGroupsStateHandle.class));
+ }
}
private abstract static class StateBackendConfig {
+ protected final boolean changelogEnabled;
+ protected final boolean incremental;
+
+ protected StateBackendConfig(boolean changelogEnabled, boolean incremental) {
+ this.changelogEnabled = changelogEnabled;
+ this.incremental = incremental;
+ }
+
public abstract String getName();
- public abstract Configuration getConfiguration();
+ public Configuration getConfiguration() {
+ Configuration stateBackendConfig = new Configuration();
+ stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, getConfigName());
+ stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+ stateBackendConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, changelogEnabled);
+ return stateBackendConfig;
+ }
public int getCheckpointsBeforeSavepoint() {
return 0;
}
+ protected abstract String getConfigName();
+
@Override
public final String toString() {
- return getName();
+ return String.format(
+ "%s, incremental: %b, changelog: %b", getName(), incremental, changelogEnabled);
}
- }
- private static final StateBackendConfig HEAP =
- new StateBackendConfig() {
- @Override
- public String getName() {
- return "HEAP";
- }
-
- @Override
- public Configuration getConfiguration() {
- Configuration stateBackendConfig = new Configuration();
- stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
- stateBackendConfig.set(
- CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
- return stateBackendConfig;
- }
- };
-
- private static final StateBackendConfig ROCKSDB_FULL_SNAPSHOTS =
- new StateBackendConfig() {
- @Override
- public String getName() {
- return "ROCKSDB_FULL_SNAPSHOTS";
- }
+ private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders =
+ asList(SavepointFormatITCase::getRocksdb, SavepointFormatITCase::heap);
- @Override
- public Configuration getConfiguration() {
- Configuration stateBackendConfig = new Configuration();
- stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
- stateBackendConfig.set(
- CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
- stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false);
- return stateBackendConfig;
- }
- };
+ public abstract boolean isIncremental();
- private static final StateBackendConfig ROCKSDB_INCREMENTAL_SNAPSHOTS =
- new StateBackendConfig() {
- @Override
- public String getName() {
- return "ROCKSDB_INCREMENTAL_SNAPSHOTS";
- }
+ private boolean isChangelogEnabled() {
+ return changelogEnabled;
+ }
+ }
- @Override
- public int getCheckpointsBeforeSavepoint() {
- return 1;
- }
+ private static StateBackendConfig heap(boolean incremental, boolean changelogEnabled) {
+ return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) {
+ @Override
+ public String getName() {
+ return "HEAP";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ Configuration stateBackendConfig = super.getConfiguration();
+ stateBackendConfig.set(
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
+ return stateBackendConfig;
+ }
+
+ @Override
+ protected String getConfigName() {
+ return "filesystem";
+ }
+
+ @Override
+ public boolean isIncremental() {
+ return false;
+ }
+ };
+ }
- @Override
- public Configuration getConfiguration() {
- Configuration stateBackendConfig = new Configuration();
- stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
- stateBackendConfig.set(
- CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
- stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
- return stateBackendConfig;
- }
- };
+ private static StateBackendConfig getRocksdb(boolean incremental, boolean changelogEnabled) {
+ return new StateBackendConfig(changelogEnabled, incremental) {
+ @Override
+ public String getName() {
+ return "ROCKSDB";
+ }
+
+ @Override
+ public int getCheckpointsBeforeSavepoint() {
+ return 1;
+ }
+
+ @Override
+ public boolean isIncremental() {
+ return this.incremental;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ Configuration stateBackendConfig = super.getConfiguration();
+ stateBackendConfig.set(
+ CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
+ return stateBackendConfig;
+ }
+
+ protected String getConfigName() {
+ return "rocksdb";
+ }
+ };
+ }
@ParameterizedTest(name = "[{index}] {0}, {1}")
@MethodSource("parameters")
public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(
- SavepointFormatType formatType,
- StateBackendConfig stateBackendConfig,
- Consumer<KeyedStateHandle> stateHandleVerification)
+ SavepointFormatType formatType, StateBackendConfig stateBackendConfig)
throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
@@ -231,20 +248,16 @@ public class SavepointFormatITCase {
submitJobAndTakeSavepoint(
miniClusterResource,
formatType,
- stateBackendConfig.getCheckpointsBeforeSavepoint());
+ stateBackendConfig.getCheckpointsBeforeSavepoint(),
+ config);
final CheckpointMetadata metadata = loadCheckpointMetadata(savepointPath);
final OperatorState operatorState =
metadata.getOperatorStates().stream().filter(hasKeyedState()).findFirst().get();
- operatorState
- .getStates()
- .forEach(
- subtaskState -> {
- subtaskState
- .getManagedKeyedState()
- .forEach(stateHandleVerification);
- });
- relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir);
+ operatorState.getStates().stream()
+ .flatMap(subtaskState -> subtaskState.getManagedKeyedState().stream())
+ .forEach(handle -> validateState(handle, formatType, stateBackendConfig));
+ relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir, config);
} finally {
miniClusterResource.after();
}
@@ -272,14 +285,17 @@ public class SavepointFormatITCase {
}
private void relocateAndVerify(
- MiniClusterWithClientResource cluster, String savepointPath, Path renamedSavepointDir)
+ MiniClusterWithClientResource cluster,
+ String savepointPath,
+ Path renamedSavepointDir,
+ Configuration config)
throws Exception {
final org.apache.flink.core.fs.Path oldPath =
new org.apache.flink.core.fs.Path(savepointPath);
final org.apache.flink.core.fs.Path newPath =
new org.apache.flink.core.fs.Path(renamedSavepointDir.toUri().toString());
(new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath);
- final JobGraph jobGraph = createJobGraph();
+ final JobGraph jobGraph = createJobGraph(config);
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(
renamedSavepointDir.toUri().toString(), false, RestoreMode.CLAIM));
@@ -293,9 +309,10 @@ public class SavepointFormatITCase {
private String submitJobAndTakeSavepoint(
MiniClusterWithClientResource cluster,
SavepointFormatType formatType,
- int checkpointBeforeSavepoint)
+ int checkpointBeforeSavepoint,
+ Configuration config)
throws Exception {
- final JobGraph jobGraph = createJobGraph();
+ final JobGraph jobGraph = createJobGraph(config);
final JobID jobId = jobGraph.getJobID();
ClusterClient<?> client = cluster.getClusterClient();
@@ -311,8 +328,11 @@ public class SavepointFormatITCase {
.get();
}
- private static JobGraph createJobGraph() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ private static JobGraph createJobGraph(Configuration config) {
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ /* pass configuration to prevent any conflicting randomization*/
+ config);
env.setParallelism(4);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.disableOperatorChaining();