You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/10 15:46:08 UTC
[flink] 02/02: Revert "[FLINK-21206] Write savepoints in unified
format from HeapStateBackend"
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3187c3431039c261b81fd641a157c6fb1e37a4bb
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Feb 10 16:31:06 2021 +0100
Revert "[FLINK-21206] Write savepoints in unified format from HeapStateBackend"
This reverts commit fc995d3af957941e0e16e56e6830ef97acbadfd1.
---
.../runtime/state/heap/HeapKeyedStateBackend.java | 19 +--
.../state/heap/HeapKeyedStateBackendBuilder.java | 16 +-
.../runtime/state/heap/HeapSavepointStrategy.java | 95 -----------
.../runtime/state/heap/HeapSnapshotResources.java | 190 ---------------------
.../runtime/state/heap/HeapSnapshotStrategy.java | 116 +++++++++++--
flink-tests/pom.xml | 9 -
.../state/HeapSavepointStateBackendSwitchTest.java | 4 +-
7 files changed, 111 insertions(+), 338 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0b42a32..c81bb68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -106,9 +106,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* The snapshot strategy for this backend. This determines, e.g., if snapshots are synchronous
* or asynchronous.
*/
- private final SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner;
-
- private final SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner;
+ private final SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner;
private final StateTableFactory<K> stateTableFactory;
@@ -127,8 +125,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
LocalRecoveryConfig localRecoveryConfig,
HeapPriorityQueueSetFactory priorityQueueSetFactory,
- SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner,
- SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner,
+ SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
StateTableFactory<K> stateTableFactory,
InternalKeyContext<K> keyContext) {
super(
@@ -144,8 +141,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.registeredPQStates = registeredPQStates;
this.localRecoveryConfig = localRecoveryConfig;
this.priorityQueueSetFactory = priorityQueueSetFactory;
- this.checkpointStrategyRunner = checkpointStrategyRunner;
- this.savepointStrategyRunner = savepointStrategyRunner;
+ this.snapshotStrategyRunner = snapshotStrategyRunner;
this.stateTableFactory = stateTableFactory;
LOG.info("Initializing heap keyed state backend with stream factory.");
}
@@ -360,13 +356,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
- if (checkpointOptions.getCheckpointType().isSavepoint()) {
- return savepointStrategyRunner.snapshot(
- checkpointId, timestamp, streamFactory, checkpointOptions);
- } else {
- return checkpointStrategyRunner.snapshot(
- checkpointId, timestamp, streamFactory, checkpointOptions);
- }
+ return snapshotStrategyRunner.snapshot(
+ checkpointId, timestamp, streamFactory, checkpointOptions);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 3991ba0..6519461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -96,14 +96,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy =
initSnapshotStrategy(registeredKVStates, registeredPQStates);
- HeapSavepointStrategy<K> savepointStrategy =
- new HeapSavepointStrategy<>(
- registeredKVStates,
- registeredPQStates,
- keyGroupCompressionDecorator,
- keyGroupRange,
- keySerializerProvider,
- numberOfKeyGroups);
InternalKeyContext<K> keyContext =
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
@@ -132,11 +124,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
snapshotStrategy,
cancelStreamRegistryForBackend,
asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
- new SnapshotStrategyRunner<>(
- "Heap backend savepoint",
- savepointStrategy,
- cancelStreamRegistryForBackend,
- asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
stateTableFactory,
keyContext);
}
@@ -200,7 +187,6 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
keyGroupCompressionDecorator,
localRecoveryConfig,
keyGroupRange,
- keySerializerProvider,
- numberOfKeyGroups);
+ keySerializerProvider);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
deleted file mode 100644
index ed70ed7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SnapshotStrategy;
-import org.apache.flink.runtime.state.StateSerializerProvider;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-
-/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
-class HeapSavepointStrategy<K>
- implements SnapshotStrategy<KeyedStateHandle, FullSnapshotResources<K>> {
-
- private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
- private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
- private final StreamCompressionDecorator keyGroupCompressionDecorator;
- private final KeyGroupRange keyGroupRange;
- private final StateSerializerProvider<K> keySerializerProvider;
- private final int totalKeyGroups;
-
- HeapSavepointStrategy(
- Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
- StreamCompressionDecorator keyGroupCompressionDecorator,
- KeyGroupRange keyGroupRange,
- StateSerializerProvider<K> keySerializerProvider,
- int totalKeyGroups) {
- this.registeredKVStates = registeredKVStates;
- this.registeredPQStates = registeredPQStates;
- this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
- this.keyGroupRange = keyGroupRange;
- this.keySerializerProvider = keySerializerProvider;
- this.totalKeyGroups = totalKeyGroups;
- }
-
- @Override
- public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
- return HeapSnapshotResources.create(
- registeredKVStates,
- registeredPQStates,
- keyGroupCompressionDecorator,
- keyGroupRange,
- getKeySerializer(),
- totalKeyGroups);
- }
-
- @Override
- public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
- FullSnapshotResources<K> syncPartResource,
- long checkpointId,
- long timestamp,
- @Nonnull CheckpointStreamFactory streamFactory,
- @Nonnull CheckpointOptions checkpointOptions) {
-
- assert checkpointOptions.getCheckpointType().isSavepoint();
- return new FullSnapshotAsyncWriter<>(
- checkpointOptions.getCheckpointType(),
- () ->
- CheckpointStreamWithResultProvider.createSimpleStream(
- CheckpointedStateScope.EXCLUSIVE, streamFactory),
- syncPartResource);
- }
-
- public TypeSerializer<K> getKeySerializer() {
- return keySerializerProvider.currentSchemaSerializer();
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
deleted file mode 100644
index aa2c0af..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyValueStateIterator;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.StateSnapshotRestore;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A set of resources required to take a checkpoint or savepoint from a {@link
- * HeapKeyedStateBackend}.
- */
-@Internal
-final class HeapSnapshotResources<K> implements FullSnapshotResources<K> {
- private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
- private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
- private final StreamCompressionDecorator streamCompressionDecorator;
- private final Map<StateUID, Integer> stateNamesToId;
- private final KeyGroupRange keyGroupRange;
- private final TypeSerializer<K> keySerializer;
- private final int totalKeyGroups;
-
- private HeapSnapshotResources(
- List<StateMetaInfoSnapshot> metaInfoSnapshots,
- Map<StateUID, StateSnapshot> cowStateStableSnapshots,
- StreamCompressionDecorator streamCompressionDecorator,
- Map<StateUID, Integer> stateNamesToId,
- KeyGroupRange keyGroupRange,
- TypeSerializer<K> keySerializer,
- int totalKeyGroups) {
- this.metaInfoSnapshots = metaInfoSnapshots;
- this.cowStateStableSnapshots = cowStateStableSnapshots;
- this.streamCompressionDecorator = streamCompressionDecorator;
- this.stateNamesToId = stateNamesToId;
- this.keyGroupRange = keyGroupRange;
- this.keySerializer = keySerializer;
- this.totalKeyGroups = totalKeyGroups;
- }
-
- public static <K> HeapSnapshotResources<K> create(
- Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
- StreamCompressionDecorator streamCompressionDecorator,
- KeyGroupRange keyGroupRange,
- TypeSerializer<K> keySerializer,
- int totalKeyGroups) {
-
- if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
- return new HeapSnapshotResources<>(
- Collections.emptyList(),
- Collections.emptyMap(),
- streamCompressionDecorator,
- Collections.emptyMap(),
- keyGroupRange,
- keySerializer,
- totalKeyGroups);
- }
-
- int numStates = registeredKVStates.size() + registeredPQStates.size();
-
- Preconditions.checkState(
- numStates <= Short.MAX_VALUE,
- "Too many states: "
- + numStates
- + ". Currently at most "
- + Short.MAX_VALUE
- + " states are supported");
-
- final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
- final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
- final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
-
- processSnapshotMetaInfoForAllStates(
- metaInfoSnapshots,
- cowStateStableSnapshots,
- stateNamesToId,
- registeredKVStates,
- StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
-
- processSnapshotMetaInfoForAllStates(
- metaInfoSnapshots,
- cowStateStableSnapshots,
- stateNamesToId,
- registeredPQStates,
- StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
-
- return new HeapSnapshotResources<>(
- metaInfoSnapshots,
- cowStateStableSnapshots,
- streamCompressionDecorator,
- stateNamesToId,
- keyGroupRange,
- keySerializer,
- totalKeyGroups);
- }
-
- private static void processSnapshotMetaInfoForAllStates(
- List<StateMetaInfoSnapshot> metaInfoSnapshots,
- Map<StateUID, StateSnapshot> cowStateStableSnapshots,
- Map<StateUID, Integer> stateNamesToId,
- Map<String, ? extends StateSnapshotRestore> registeredStates,
- StateMetaInfoSnapshot.BackendStateType stateType) {
-
- for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
- registeredStates.entrySet()) {
- final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
- stateNamesToId.put(stateUid, stateNamesToId.size());
- StateSnapshotRestore state = kvState.getValue();
- if (null != state) {
- final StateSnapshot stateSnapshot = state.stateSnapshot();
- metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
- cowStateStableSnapshots.put(stateUid, stateSnapshot);
- }
- }
- }
-
- @Override
- public void release() {
- for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
- stateSnapshot.release();
- }
- }
-
- public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
- return metaInfoSnapshots;
- }
-
- @Override
- public KeyValueStateIterator createKVStateIterator() throws IOException {
- return new HeapKeyValueStateIterator(
- keyGroupRange,
- keySerializer,
- totalKeyGroups,
- stateNamesToId,
- cowStateStableSnapshots);
- }
-
- @Override
- public KeyGroupRange getKeyGroupRange() {
- return keyGroupRange;
- }
-
- @Override
- public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
- }
-
- @Override
- public StreamCompressionDecorator getStreamCompressionDecorator() {
- return streamCompressionDecorator;
- }
-
- public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
- return cowStateStableSnapshots;
- }
-
- public Map<StateUID, Integer> getStateNamesToId() {
- return stateNamesToId;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 78b96e5..fe48c90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -30,20 +30,26 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -54,7 +60,7 @@ import static org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.
/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
class HeapSnapshotStrategy<K>
- implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
+ implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotStrategy.HeapSnapshotResources> {
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
@@ -62,7 +68,6 @@ class HeapSnapshotStrategy<K>
private final LocalRecoveryConfig localRecoveryConfig;
private final KeyGroupRange keyGroupRange;
private final StateSerializerProvider<K> keySerializerProvider;
- private final int totalKeyGroups;
HeapSnapshotStrategy(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
@@ -70,31 +75,58 @@ class HeapSnapshotStrategy<K>
StreamCompressionDecorator keyGroupCompressionDecorator,
LocalRecoveryConfig localRecoveryConfig,
KeyGroupRange keyGroupRange,
- StateSerializerProvider<K> keySerializerProvider,
- int totalKeyGroups) {
+ StateSerializerProvider<K> keySerializerProvider) {
this.registeredKVStates = registeredKVStates;
this.registeredPQStates = registeredPQStates;
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
this.localRecoveryConfig = localRecoveryConfig;
this.keyGroupRange = keyGroupRange;
this.keySerializerProvider = keySerializerProvider;
- this.totalKeyGroups = totalKeyGroups;
}
@Override
- public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
- return HeapSnapshotResources.create(
+ public HeapSnapshotResources syncPrepareResources(long checkpointId) {
+
+ if (!hasRegisteredState()) {
+ return new HeapSnapshotResources(
+ Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
+ }
+
+ int numStates = registeredKVStates.size() + registeredPQStates.size();
+
+ Preconditions.checkState(
+ numStates <= Short.MAX_VALUE,
+ "Too many states: "
+ + numStates
+ + ". Currently at most "
+ + Short.MAX_VALUE
+ + " states are supported");
+
+ final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
+ final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
+ final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
+
+ processSnapshotMetaInfoForAllStates(
+ metaInfoSnapshots,
+ cowStateStableSnapshots,
+ stateNamesToId,
registeredKVStates,
+ StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+
+ processSnapshotMetaInfoForAllStates(
+ metaInfoSnapshots,
+ cowStateStableSnapshots,
+ stateNamesToId,
registeredPQStates,
- keyGroupCompressionDecorator,
- keyGroupRange,
- getKeySerializer(),
- totalKeyGroups);
+ StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+
+ return new HeapSnapshotResources(
+ metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
- HeapSnapshotResources<K> syncPartResource,
+ HeapSnapshotResources syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@@ -110,7 +142,7 @@ class HeapSnapshotStrategy<K>
// TODO: this code assumes that writing a serializer is threadsafe, we
// should support to
// get a serialized form already at state registration time in the future
- syncPartResource.getKeySerializer(),
+ getKeySerializer(),
metaInfoSnapshots,
!Objects.equals(
UncompressedStreamCompressionDecorator.INSTANCE,
@@ -182,7 +214,65 @@ class HeapSnapshotStrategy<K>
};
}
+ private void processSnapshotMetaInfoForAllStates(
+ List<StateMetaInfoSnapshot> metaInfoSnapshots,
+ Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+ Map<StateUID, Integer> stateNamesToId,
+ Map<String, ? extends StateSnapshotRestore> registeredStates,
+ StateMetaInfoSnapshot.BackendStateType stateType) {
+
+ for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
+ registeredStates.entrySet()) {
+ final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
+ stateNamesToId.put(stateUid, stateNamesToId.size());
+ StateSnapshotRestore state = kvState.getValue();
+ if (null != state) {
+ final StateSnapshot stateSnapshot = state.stateSnapshot();
+ metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+ cowStateStableSnapshots.put(stateUid, stateSnapshot);
+ }
+ }
+ }
+
+ private boolean hasRegisteredState() {
+ return !(registeredKVStates.isEmpty() && registeredPQStates.isEmpty());
+ }
+
public TypeSerializer<K> getKeySerializer() {
return keySerializerProvider.currentSchemaSerializer();
}
+
+ static class HeapSnapshotResources implements SnapshotResources {
+ private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
+ private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
+ private final Map<StateUID, Integer> stateNamesToId;
+
+ HeapSnapshotResources(
+ @Nonnull List<StateMetaInfoSnapshot> metaInfoSnapshots,
+ @Nonnull Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+ @Nonnull Map<StateUID, Integer> stateNamesToId) {
+ this.metaInfoSnapshots = metaInfoSnapshots;
+ this.cowStateStableSnapshots = cowStateStableSnapshots;
+ this.stateNamesToId = stateNamesToId;
+ }
+
+ @Override
+ public void release() {
+ for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
+ stateSnapshot.release();
+ }
+ }
+
+ public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
+ return metaInfoSnapshots;
+ }
+
+ public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
+ return cowStateStableSnapshots;
+ }
+
+ public Map<StateUID, Integer> getStateNamesToId() {
+ return stateNamesToId;
+ }
+ }
}
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index fb30a1d..7031344 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,15 +231,6 @@ under the License.
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
index 5138e03..d5708a6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
/** Tests for switching a HEAP state backend to a different one. */
@RunWith(Parameterized.class)
@@ -35,6 +35,6 @@ public class HeapSavepointStateBackendSwitchTest extends SavepointStateBackendSw
@Parameterized.Parameters
public static Collection<BackendSwitchSpec> targetBackends() {
- return Arrays.asList(BackendSwitchSpecs.HEAP, BackendSwitchSpecs.ROCKS);
+ return Collections.singletonList(BackendSwitchSpecs.HEAP);
}
}