You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:23 UTC
[flink] 07/09: [FLINK-20978] Implement SavepointKeyedStateHandle
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d1e386e808ddbd211bc2b0daaa441c82e2cd1ae
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:24:32 2021 +0100
[FLINK-20978] Implement SavepointKeyedStateHandle
Introduce a marker SavepointKeyedStateHandle interface for state handles that describe savepoints. Based on the interface we can later decide which strategy to use when restoring from the handle.
---
.../metadata/MetadataV2V3SerializerBase.java | 17 +++--
.../state/CheckpointStreamWithResultProvider.java | 22 +++++--
.../runtime/state/FullSnapshotAsyncWriter.java | 13 +++-
.../state/KeyGroupsSavepointStateHandle.java | 72 ++++++++++++++++++++++
.../runtime/state/SavepointKeyedStateHandle.java | 22 +++++++
.../state/heap/HeapKeyedStateBackendBuilder.java | 72 ++++++++++++++++------
.../runtime/state/heap/HeapSnapshotStrategy.java | 3 +-
.../checkpoint/metadata/CheckpointTestUtils.java | 30 +++++++--
.../state/snapshot/RocksFullSnapshotStrategy.java | 4 +-
9 files changed, 219 insertions(+), 36 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 010003d..5bb8820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -91,6 +92,7 @@ public abstract class MetadataV2V3SerializerBase {
private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
+ private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7;
// ------------------------------------------------------------------------
// (De)serialization entry points
@@ -280,7 +282,11 @@ public abstract class MetadataV2V3SerializerBase {
} else if (stateHandle instanceof KeyGroupsStateHandle) {
KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
- dos.writeByte(KEY_GROUPS_HANDLE);
+ if (stateHandle instanceof KeyGroupsSavepointStateHandle) {
+ dos.writeByte(SAVEPOINT_KEY_GROUPS_HANDLE);
+ } else {
+ dos.writeByte(KEY_GROUPS_HANDLE);
+ }
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
@@ -316,8 +322,7 @@ public abstract class MetadataV2V3SerializerBase {
if (NULL_HANDLE == type) {
return null;
- } else if (KEY_GROUPS_HANDLE == type) {
-
+ } else if (KEY_GROUPS_HANDLE == type || SAVEPOINT_KEY_GROUPS_HANDLE == type) {
int startKeyGroup = dis.readInt();
int numKeyGroups = dis.readInt();
KeyGroupRange keyGroupRange =
@@ -329,7 +334,11 @@ public abstract class MetadataV2V3SerializerBase {
KeyGroupRangeOffsets keyGroupRangeOffsets =
new KeyGroupRangeOffsets(keyGroupRange, offsets);
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context);
- return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ if (SAVEPOINT_KEY_GROUPS_HANDLE == type) {
+ return new KeyGroupsSavepointStateHandle(keyGroupRangeOffsets, stateHandle);
+ } else {
+ return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ }
} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
long checkpointId = dis.readLong();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
index 4a8c89b..cd7b181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -195,27 +195,39 @@ public interface CheckpointStreamWithResultProvider extends Closeable {
}
/**
+ * Factory method for a {@link KeyedStateHandle} to be used in {@link
+ * #toKeyedStateHandleSnapshotResult(SnapshotResult, KeyGroupRangeOffsets,
+ * KeyedStateHandleFactory)}.
+ */
+ @FunctionalInterface
+ interface KeyedStateHandleFactory {
+ KeyedStateHandle create(
+ KeyGroupRangeOffsets keyGroupRangeOffsets, StreamStateHandle streamStateHandle);
+ }
+
+ /**
* Helper method that takes a {@link SnapshotResult<StreamStateHandle>} and a {@link
- * KeyGroupRangeOffsets} and creates a {@link SnapshotResult<KeyGroupsStateHandle>} by combining
- * the key groups offsets with all the present stream state handles.
+ * KeyGroupRangeOffsets} and creates a {@link SnapshotResult<KeyedStateHandle>} by combining the
+ * key groups offsets with all the present stream state handles.
*/
@Nonnull
static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult(
@Nonnull SnapshotResult<StreamStateHandle> snapshotResult,
- @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) {
+ @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets,
+ @Nonnull KeyedStateHandleFactory stateHandleFactory) {
StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
if (jobManagerOwnedSnapshot != null) {
KeyedStateHandle jmKeyedState =
- new KeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
+ stateHandleFactory.create(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();
if (taskLocalSnapshot != null) {
KeyedStateHandle localKeyedState =
- new KeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);
+ stateHandleFactory.create(keyGroupRangeOffsets, taskLocalSnapshot);
return SnapshotResult.withLocalState(jmKeyedState, localKeyedState);
} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
index ffc7684..c5a7277 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -50,8 +51,10 @@ public class FullSnapshotAsyncWriter<K>
checkpointStreamSupplier;
@Nonnull private final FullSnapshotResources<K> snapshotResources;
+ @Nonnull private final CheckpointType checkpointType;
public FullSnapshotAsyncWriter(
+ @Nonnull CheckpointType checkpointType,
@Nonnull
SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier,
@@ -59,6 +62,7 @@ public class FullSnapshotAsyncWriter<K>
this.checkpointStreamSupplier = checkpointStreamSupplier;
this.snapshotResources = snapshotResources;
+ this.checkpointType = checkpointType;
}
@Override
@@ -73,9 +77,16 @@ public class FullSnapshotAsyncWriter<K>
writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
+ final CheckpointStreamWithResultProvider.KeyedStateHandleFactory stateHandleFactory;
+ if (checkpointType.isSavepoint()) {
+ stateHandleFactory = KeyGroupsSavepointStateHandle::new;
+ } else {
+ stateHandleFactory = KeyGroupsStateHandle::new;
+ }
return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(
checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(),
- keyGroupRangeOffsets);
+ keyGroupRangeOffsets,
+ stateHandleFactory);
} else {
throw new IOException("Stream is already unregistered/closed.");
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java
new file mode 100644
index 0000000..aa8bb41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsSavepointStateHandle.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/** A {@link KeyGroupsStateHandle} that describes a savepoint in the unified format. */
+public class KeyGroupsSavepointStateHandle extends KeyGroupsStateHandle
+ implements SavepointKeyedStateHandle {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * @param groupRangeOffsets range of key-group ids that in the state of this handle
+ * @param streamStateHandle handle to the actual state of the key-groups
+ */
+ public KeyGroupsSavepointStateHandle(
+ KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+ super(groupRangeOffsets, streamStateHandle);
+ }
+
+ /**
+ * @param keyGroupRange a key group range to intersect.
+ * @return key-group state over a range that is the intersection between this handle's key-group
+ * range and the provided key-group range.
+ */
+ @Override
+ public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ KeyGroupRangeOffsets offsets = getGroupRangeOffsets().getIntersection(keyGroupRange);
+ if (offsets.getKeyGroupRange().getNumberOfKeyGroups() <= 0) {
+ return null;
+ }
+ return new KeyGroupsSavepointStateHandle(offsets, getDelegateStateHandle());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof KeyGroupsSavepointStateHandle)) {
+ return false;
+ }
+
+ return super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "KeyGroupsSavepointStateHandle{"
+ + "groupRangeOffsets="
+ + getGroupRangeOffsets()
+ + ", stateHandle="
+ + getDelegateStateHandle()
+ + '}';
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java
new file mode 100644
index 0000000..ae0ebaf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SavepointKeyedStateHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/** A {@link KeyedStateHandle} that points to a savepoint taken in the unified format. */
+public interface SavepointKeyedStateHandle extends KeyedStateHandle {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 12bbc05..6519461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RestoreOperation;
+import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -104,25 +106,7 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
stateTableFactory = NestedMapsStateTable::new;
}
- HeapRestoreOperation<K> restoreOperation =
- new HeapRestoreOperation<>(
- restoreStateHandles,
- keySerializerProvider,
- userCodeClassLoader,
- registeredKVStates,
- registeredPQStates,
- cancelStreamRegistry,
- priorityQueueSetFactory,
- keyGroupRange,
- numberOfKeyGroups,
- stateTableFactory,
- keyContext);
- try {
- restoreOperation.restore();
- logger.info("Finished to build heap keyed state-backend.");
- } catch (Exception e) {
- throw new BackendBuildingException("Failed when trying to restore heap backend", e);
- }
+ restoreState(registeredKVStates, registeredPQStates, keyContext, stateTableFactory);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializerProvider.currentSchemaSerializer(),
@@ -144,6 +128,56 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
keyContext);
}
+ private void restoreState(
+ Map<String, StateTable<K, ?, ?>> registeredKVStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+ InternalKeyContext<K> keyContext,
+ StateTableFactory<K> stateTableFactory)
+ throws BackendBuildingException {
+ final RestoreOperation<Void> restoreOperation;
+
+ final KeyedStateHandle firstHandle;
+ if (restoreStateHandles.isEmpty()) {
+ firstHandle = null;
+ } else {
+ firstHandle = restoreStateHandles.iterator().next();
+ }
+ if (firstHandle instanceof SavepointKeyedStateHandle) {
+ restoreOperation =
+ new HeapSavepointRestoreOperation<>(
+ restoreStateHandles,
+ keySerializerProvider,
+ userCodeClassLoader,
+ registeredKVStates,
+ registeredPQStates,
+ priorityQueueSetFactory,
+ keyGroupRange,
+ numberOfKeyGroups,
+ stateTableFactory,
+ keyContext);
+ } else {
+ restoreOperation =
+ new HeapRestoreOperation<>(
+ restoreStateHandles,
+ keySerializerProvider,
+ userCodeClassLoader,
+ registeredKVStates,
+ registeredPQStates,
+ cancelStreamRegistry,
+ priorityQueueSetFactory,
+ keyGroupRange,
+ numberOfKeyGroups,
+ stateTableFactory,
+ keyContext);
+ }
+ try {
+ restoreOperation.restore();
+ logger.info("Finished to build heap keyed state-backend.");
+ } catch (Exception e) {
+ throw new BackendBuildingException("Failed when trying to restore heap backend", e);
+ }
+ }
+
private HeapSnapshotStrategy<K> initSnapshotStrategy(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 4010422..fe48c90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -206,7 +207,7 @@ class HeapSnapshotStrategy<K>
new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
- return toKeyedStateHandleSnapshotResult(result, kgOffs);
+ return toKeyedStateHandleSnapshotResult(result, kgOffs, KeyGroupsStateHandle::new);
} else {
throw new IOException("Stream already unregistered.");
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 766298c..d8c83e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
@@ -126,14 +128,25 @@ public class CheckpointTestUtils {
}
if (hasKeyedBackend) {
- state.setRawKeyedState(
- isIncremental && !isSavepoint(basePath)
- ? createDummyIncrementalKeyedStateHandle(random)
- : createDummyKeyGroupStateHandle(random, basePath));
+ final KeyedStateHandle stateHandle;
+ if (isSavepoint(basePath)) {
+ stateHandle = createDummyKeyGroupSavepointStateHandle(random, basePath);
+ } else if (isIncremental) {
+ stateHandle = createDummyIncrementalKeyedStateHandle(random);
+ } else {
+ stateHandle = createDummyKeyGroupStateHandle(random, null);
+ }
+ state.setRawKeyedState(stateHandle);
}
if (hasKeyedStream) {
- state.setManagedKeyedState(createDummyKeyGroupStateHandle(random, basePath));
+ final KeyedStateHandle stateHandle;
+ if (isSavepoint(basePath)) {
+ stateHandle = createDummyKeyGroupSavepointStateHandle(random, basePath);
+ } else {
+ stateHandle = createDummyKeyGroupStateHandle(random, null);
+ }
+ state.setManagedKeyedState(stateHandle);
}
state.setInputChannelState(
@@ -217,6 +230,13 @@ public class CheckpointTestUtils {
return result;
}
+ public static KeyGroupsStateHandle createDummyKeyGroupSavepointStateHandle(
+ Random rnd, String basePath) {
+ return new KeyGroupsSavepointStateHandle(
+ new KeyGroupRangeOffsets(1, 1, new long[] {rnd.nextInt(1024)}),
+ createDummyStreamStateHandle(rnd, basePath));
+ }
+
public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd, String basePath) {
return new KeyGroupsStateHandle(
new KeyGroupRangeOffsets(1, 1, new long[] {rnd.nextInt(1024)}),
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 63bf351..0b83bd8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -137,7 +137,9 @@ public class RocksFullSnapshotStrategy<K>
checkpointId, checkpointStreamFactory, checkpointOptions);
return new FullSnapshotAsyncWriter<>(
- checkpointStreamSupplier, fullRocksDBSnapshotResources);
+ checkpointOptions.getCheckpointType(),
+ checkpointStreamSupplier,
+ fullRocksDBSnapshotResources);
}
@Override