You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/08/09 03:25:41 UTC
[flink] branch master updated: [FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d9a067e5e1c [FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format
d9a067e5e1c is described below
commit d9a067e5e1c8672930b0ea7d76400a1d3020a1e2
Author: Lihe Ma <ma...@163.com>
AuthorDate: Thu Jun 30 10:42:28 2022 +0800
[FLINK-28699][state] Make non-incremental rocksdb checkpoint as native format
This closes #20399.
---
.../flink/runtime/state/ttl/TtlStateTestBase.java | 6 +-
.../state/RocksDBKeyedStateBackendBuilder.java | 17 +-
.../snapshot/RocksDBSnapshotStrategyBase.java | 318 +++++++++++++++++++-
.../state/snapshot/RocksFullSnapshotStrategy.java | 169 -----------
.../snapshot/RocksIncrementalSnapshotStrategy.java | 323 ++-------------------
.../snapshot/RocksNativeFullSnapshotStrategy.java | 255 ++++++++++++++++
.../state/snapshot/RocksSnapshotUtil.java | 13 +
.../state/EmbeddedRocksDBStateBackendTest.java | 10 -
.../RocksIncrementalSnapshotStrategyTest.java | 2 +-
.../state/ttl/FullSnapshotRocksDbTtlStateTest.java | 5 +
.../state/ttl/IncSnapshotRocksDbTtlStateTest.java | 2 +-
.../ResumeCheckpointManuallyITCase.java | 91 +++++-
.../test/checkpointing/SavepointFormatITCase.java | 9 +-
13 files changed, 728 insertions(+), 492 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index d6225ab0c4b..c1e291450f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -95,7 +95,7 @@ public abstract class TtlStateTestBase {
new TtlReducingStateTestContext());
}
- public boolean fullSnapshot() {
+ public boolean isSavepoint() {
return true;
}
@@ -354,7 +354,7 @@ public abstract class TtlStateTestBase {
@Test
public void testMultipleKeysWithSnapshotCleanup() throws Exception {
- assumeTrue("full snapshot strategy", fullSnapshot());
+ assumeTrue("full snapshot strategy", isSavepoint());
initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
// set time back after restore to see entry unexpired if it was not cleaned up in snapshot
// properly
@@ -369,7 +369,7 @@ public abstract class TtlStateTestBase {
@Test
public void testMultipleNamespacesWithSnapshotCleanup() throws Exception {
- assumeTrue("full snapshot strategy", fullSnapshot());
+ assumeTrue("full snapshot strategy", isSavepoint());
initTest(getConfBuilder(TTL).cleanupFullSnapshot().build());
// set time back after restore to see entry unexpired if it was not cleaned up in snapshot
// properly
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index ca1adc27303..7007f1b5463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -28,8 +28,8 @@ import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperat
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
-import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
@@ -524,11 +524,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles,
long lastCompletedCheckpointId) {
RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
+ RocksDBStateUploader stateUploader =
+ injectRocksDBStateUploader == null
+ ? new RocksDBStateUploader(numberOfTransferingThreads)
+ : injectRocksDBStateUploader;
if (enableIncrementalCheckpointing) {
- RocksDBStateUploader stateUploader =
- injectRocksDBStateUploader == null
- ? new RocksDBStateUploader(numberOfTransferingThreads)
- : injectRocksDBStateUploader;
checkpointSnapshotStrategy =
new RocksIncrementalSnapshotStrategy<>(
db,
@@ -546,16 +546,17 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
lastCompletedCheckpointId);
} else {
checkpointSnapshotStrategy =
- new RocksFullSnapshotStrategy<>(
+ new RocksNativeFullSnapshotStrategy<>(
db,
rocksDBResourceGuard,
keySerializerProvider.currentSchemaSerializer(),
kvStateInformation,
- registeredPQStates,
keyGroupRange,
keyGroupPrefixBytes,
localRecoveryConfig,
- keyGroupCompressionDecorator);
+ instanceBasePath,
+ backendUID,
+ stateUploader);
}
return checkpointSnapshotStrategy;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index b064fc54921..de2be83d8ef 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -21,21 +21,53 @@ package org.apache.flink.contrib.streaming.state.snapshot;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResources;
+import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
+import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
/**
* Abstract base class for {@link SnapshotStrategy} implementations for RocksDB state backend.
@@ -43,7 +75,11 @@ import java.util.LinkedHashMap;
* @param <K> type of the backend keys.
*/
public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources>
- implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {
+ implements CheckpointListener,
+ SnapshotStrategy<
+ KeyedStateHandle,
+ RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources>,
+ AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
@@ -69,6 +105,15 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
/** The configuration for local recovery. */
@Nonnull protected final LocalRecoveryConfig localRecoveryConfig;
+ /** Base path of the RocksDB instance. */
+ @Nonnull protected final File instanceBasePath;
+
+ /** The local directory name of the current snapshot strategy. */
+ protected final String localDirectoryName;
+
+ /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
+ @Nonnull protected final UUID backendUID;
+
public RocksDBSnapshotStrategyBase(
@Nonnull String description,
@Nonnull RocksDB db,
@@ -77,7 +122,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
- @Nonnull LocalRecoveryConfig localRecoveryConfig) {
+ @Nonnull LocalRecoveryConfig localRecoveryConfig,
+ @Nonnull File instanceBasePath,
+ @Nonnull UUID backendUID) {
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.keySerializer = keySerializer;
@@ -86,6 +133,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.localRecoveryConfig = localRecoveryConfig;
this.description = description;
+ this.instanceBasePath = instanceBasePath;
+ this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
+ this.backendUID = backendUID;
}
@Nonnull
@@ -93,6 +143,270 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources
return description;
}
+ @Override
+ public NativeRocksDBSnapshotResources syncPrepareResources(long checkpointId) throws Exception {
+
+ final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
+ LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
+
+ final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
+ new ArrayList<>(kvStateInformation.size());
+ final PreviousSnapshot previousSnapshot =
+ snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
+
+ takeDBNativeCheckpoint(snapshotDirectory);
+
+ return new NativeRocksDBSnapshotResources(
+ snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
+ }
+
+ protected abstract PreviousSnapshot snapshotMetaData(
+ long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots);
+
+ private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
+ throws Exception {
+ // create hard links of living files in the output path
+ try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
+ Checkpoint checkpoint = Checkpoint.create(db)) {
+ checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
+ } catch (Exception ex) {
+ try {
+ outputDirectory.cleanup();
+ } catch (IOException cleanupEx) {
+ ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
+ }
+ throw ex;
+ }
+ }
+
+ @Nonnull
+ protected SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId)
+ throws IOException {
+
+ if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+ // create a "permanent" snapshot directory for local recovery.
+ LocalRecoveryDirectoryProvider directoryProvider =
+ localRecoveryConfig
+ .getLocalStateDirectoryProvider()
+ .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
+ File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+
+ if (!directory.exists() && !directory.mkdirs()) {
+ throw new IOException(
+ "Local state base directory for checkpoint "
+ + checkpointId
+ + " does not exist and could not be created: "
+ + directory);
+ }
+
+ // introduces an extra directory because RocksDB wants a non-existing directory for
+ // native checkpoints.
+ // append localDirectoryName here to solve directory collision problem when two stateful
+ // operators chained in one task.
+ File rdbSnapshotDir = new File(directory, localDirectoryName);
+ if (rdbSnapshotDir.exists()) {
+ FileUtils.deleteDirectory(rdbSnapshotDir);
+ }
+
+ Path path = rdbSnapshotDir.toPath();
+ // create a "permanent" snapshot directory because local recovery is active.
+ try {
+ return SnapshotDirectory.permanent(path);
+ } catch (IOException ex) {
+ try {
+ FileUtils.deleteDirectory(directory);
+ } catch (IOException delEx) {
+ ex = ExceptionUtils.firstOrSuppressed(delEx, ex);
+ }
+ throw ex;
+ }
+ } else {
+ // create a "temporary" snapshot directory because local recovery is inactive.
+ File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
+ return SnapshotDirectory.temporary(snapshotDir);
+ }
+ }
+
+ protected void cleanupIncompleteSnapshot(
+ @Nonnull List<StateObject> statesToDiscard,
+ @Nonnull SnapshotDirectory localBackupDirectory) {
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard states.", e);
+ }
+
+ if (localBackupDirectory.isSnapshotCompleted()) {
+ try {
+ DirectoryStateHandle directoryStateHandle =
+ localBackupDirectory.completeSnapshotAndGetHandle();
+ if (directoryStateHandle != null) {
+ directoryStateHandle.discardState();
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard local state.", e);
+ }
+ }
+ }
+
+ @Nonnull
+ protected SnapshotResult<StreamStateHandle> materializeMetaData(
+ @Nonnull CloseableRegistry snapshotCloseableRegistry,
+ @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+ long checkpointId,
+ @Nonnull CheckpointStreamFactory checkpointStreamFactory)
+ throws Exception {
+
+ CheckpointStreamWithResultProvider streamWithResultProvider =
+ localRecoveryConfig.isLocalRecoveryEnabled()
+ ? CheckpointStreamWithResultProvider.createDuplicatingStream(
+ checkpointId,
+ CheckpointedStateScope.EXCLUSIVE,
+ checkpointStreamFactory,
+ localRecoveryConfig
+ .getLocalStateDirectoryProvider()
+ .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
+ : CheckpointStreamWithResultProvider.createSimpleStream(
+ CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
+
+ snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
+
+ try {
+ // no need for compression scheme support because sst-files are already compressed
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(
+ keySerializer, stateMetaInfoSnapshots, false);
+
+ DataOutputView out =
+ new DataOutputViewStreamWrapper(
+ streamWithResultProvider.getCheckpointOutputStream());
+
+ serializationProxy.write(out);
+
+ if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
+ SnapshotResult<StreamStateHandle> result =
+ streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
+ streamWithResultProvider = null;
+ return result;
+ } else {
+ throw new IOException("Stream already closed and cannot return a handle.");
+ }
+ } finally {
+ if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
+ IOUtils.closeQuietly(streamWithResultProvider);
+ }
+ }
+ }
+
@Override
public abstract void close();
+
+ /** Common operation in native rocksdb snapshot result supplier. */
+ protected abstract class RocksDBSnapshotOperation
+ implements SnapshotResultSupplier<KeyedStateHandle> {
+ /** Id for the current checkpoint. */
+ protected final long checkpointId;
+
+ /** Stream factory that creates the output streams to DFS. */
+ @Nonnull protected final CheckpointStreamFactory checkpointStreamFactory;
+
+ /** The state meta data. */
+ @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+ /** Local directory for the RocksDB native backup. */
+ @Nonnull protected final SnapshotDirectory localBackupDirectory;
+
+ protected RocksDBSnapshotOperation(
+ long checkpointId,
+ @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+ @Nonnull SnapshotDirectory localBackupDirectory,
+ @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+ this.checkpointId = checkpointId;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+ this.localBackupDirectory = localBackupDirectory;
+ }
+
+ protected Optional<KeyedStateHandle> getLocalSnapshot(
+ @Nullable StreamStateHandle localStreamStateHandle,
+ Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs)
+ throws IOException {
+ final DirectoryStateHandle directoryStateHandle =
+ localBackupDirectory.completeSnapshotAndGetHandle();
+ if (directoryStateHandle != null && localStreamStateHandle != null) {
+ return Optional.of(
+ new IncrementalLocalKeyedStateHandle(
+ backendUID,
+ checkpointId,
+ directoryStateHandle,
+ keyGroupRange,
+ localStreamStateHandle,
+ sharedStateHandleIDs));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ /** A {@link SnapshotResources} for native rocksdb snapshot. */
+ protected static class NativeRocksDBSnapshotResources implements SnapshotResources {
+ @Nonnull protected final SnapshotDirectory snapshotDirectory;
+
+ @Nonnull protected final PreviousSnapshot previousSnapshot;
+
+ @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+ public NativeRocksDBSnapshotResources(
+ SnapshotDirectory snapshotDirectory,
+ PreviousSnapshot previousSnapshot,
+ List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+ this.snapshotDirectory = snapshotDirectory;
+ this.previousSnapshot = previousSnapshot;
+ this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+ }
+
+ @Override
+ public void release() {
+ try {
+ if (snapshotDirectory.exists()) {
+ LOG.trace(
+ "Running cleanup for local RocksDB backup directory {}.",
+ snapshotDirectory);
+ boolean cleanupOk = snapshotDirectory.cleanup();
+
+ if (!cleanupOk) {
+ LOG.debug("Could not properly cleanup local RocksDB backup directory.");
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
+ }
+ }
+ }
+
+ protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
+ new PreviousSnapshot(Collections.emptyMap());
+
+ /** Previous snapshot with uploaded sst files. */
+ protected static class PreviousSnapshot {
+
+ @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+
+ protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) {
+ this.confirmedSstFiles = confirmedSstFiles;
+ }
+
+ protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) {
+ if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) {
+ // we introduce a placeholder state handle, that is replaced with the
+ // original from the shared state registry (created from a previous checkpoint)
+ return Optional.of(
+ new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+ } else {
+ // Don't use any uploaded but not confirmed handles because they might be deleted
+ // (by TM) if the previous checkpoint failed. See FLINK-25395
+ return Optional.empty();
+ }
+ }
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
deleted file mode 100644
index c328902ad51..00000000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.snapshot;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
-import org.apache.flink.runtime.state.FullSnapshotResources;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
-import org.apache.flink.util.ResourceGuard;
-import org.apache.flink.util.function.SupplierWithException;
-
-import org.rocksdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-import java.util.LinkedHashMap;
-
-/**
- * Snapshot strategy to create full snapshots of {@link
- * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. Iterates and writes all
- * states from a RocksDB snapshot of the column families.
- *
- * @param <K> type of the backend keys.
- */
-public class RocksFullSnapshotStrategy<K>
- extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
-
- private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
-
- /** This decorator is used to apply compression per key-group for the written snapshot data. */
- @Nonnull private final StreamCompressionDecorator keyGroupCompressionDecorator;
-
- private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
- registeredPQStates;
-
- public RocksFullSnapshotStrategy(
- @Nonnull RocksDB db,
- @Nonnull ResourceGuard rocksDBResourceGuard,
- @Nonnull TypeSerializer<K> keySerializer,
- @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
- @Nonnull
- LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
- registeredPQStates,
- @Nonnull KeyGroupRange keyGroupRange,
- @Nonnegative int keyGroupPrefixBytes,
- @Nonnull LocalRecoveryConfig localRecoveryConfig,
- @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
- super(
- DESCRIPTION,
- db,
- rocksDBResourceGuard,
- keySerializer,
- kvStateInformation,
- keyGroupRange,
- keyGroupPrefixBytes,
- localRecoveryConfig);
-
- this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
- this.registeredPQStates = registeredPQStates;
- }
-
- @Override
- public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
- return RocksDBFullSnapshotResources.create(
- kvStateInformation,
- registeredPQStates,
- db,
- rocksDBResourceGuard,
- keyGroupRange,
- keySerializer,
- keyGroupPrefixBytes,
- keyGroupCompressionDecorator);
- }
-
- @Override
- public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
- FullSnapshotResources<K> fullRocksDBSnapshotResources,
- long checkpointId,
- long timestamp,
- @Nonnull CheckpointStreamFactory checkpointStreamFactory,
- @Nonnull CheckpointOptions checkpointOptions) {
-
- if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
- timestamp);
- }
- return registry -> SnapshotResult.empty();
- }
-
- final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
- checkpointStreamSupplier =
- createCheckpointStreamSupplier(
- checkpointId, checkpointStreamFactory, checkpointOptions);
-
- return new FullSnapshotAsyncWriter<>(
- checkpointOptions.getCheckpointType(),
- checkpointStreamSupplier,
- fullRocksDBSnapshotResources);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- // nothing to do.
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) {
- // nothing to do.
- }
-
- @Override
- public void close() {
- // nothing to do.
- }
-
- private SupplierWithException<CheckpointStreamWithResultProvider, Exception>
- createCheckpointStreamSupplier(
- long checkpointId,
- CheckpointStreamFactory primaryStreamFactory,
- CheckpointOptions checkpointOptions) {
-
- return localRecoveryConfig.isLocalRecoveryEnabled()
- && !checkpointOptions.getCheckpointType().isSavepoint()
- ? () ->
- CheckpointStreamWithResultProvider.createDuplicatingStream(
- checkpointId,
- CheckpointedStateScope.EXCLUSIVE,
- primaryStreamFactory,
- localRecoveryConfig
- .getLocalStateDirectoryProvider()
- .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
- : () ->
- CheckpointStreamWithResultProvider.createSimpleStream(
- CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
- }
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 8532e567549..bd464b05677 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -22,52 +22,35 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotDirectory;
-import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
-import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.File;
-import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -79,6 +62,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
/**
* Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}
@@ -88,19 +72,13 @@ import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUti
*/
public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<
- K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {
+ K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
private static final Logger LOG =
LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";
- /** Base path of the RocksDB instance. */
- @Nonnull private final File instanceBasePath;
-
- /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
- @Nonnull private final UUID backendUID;
-
/**
* Stores the {@link StateHandleID IDs} of uploaded SST files that build the incremental
* history. Once the checkpoint is confirmed by JM, only the ID paired with {@link
@@ -114,9 +92,6 @@ public class RocksIncrementalSnapshotStrategy<K>
/** The help class used to upload state files. */
private final RocksDBStateUploader stateUploader;
- /** The local directory name of the current snapshot strategy. */
- private final String localDirectoryName;
-
public RocksIncrementalSnapshotStrategy(
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@@ -140,10 +115,9 @@ public class RocksIncrementalSnapshotStrategy<K>
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
- localRecoveryConfig);
-
- this.instanceBasePath = instanceBasePath;
- this.backendUID = backendUID;
+ localRecoveryConfig,
+ instanceBasePath,
+ backendUID);
this.uploadedStateIDs = new TreeMap<>();
for (Map.Entry<Long, Map<StateHandleID, StreamStateHandle>> entry :
uploadedStateHandles.entrySet()) {
@@ -156,30 +130,11 @@ public class RocksIncrementalSnapshotStrategy<K>
}
this.stateUploader = rocksDBStateUploader;
this.lastCompletedCheckpointId = lastCompletedCheckpointId;
- this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
- }
-
- @Override
- public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
- throws Exception {
-
- final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
- LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
-
- final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
- new ArrayList<>(kvStateInformation.size());
- final PreviousSnapshot previousSnapshot =
- snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
-
- takeDBNativeCheckpoint(snapshotDirectory);
-
- return new IncrementalRocksDBSnapshotResources(
- snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
- IncrementalRocksDBSnapshotResources snapshotResources,
+ NativeRocksDBSnapshotResources snapshotResources,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@@ -247,54 +202,8 @@ public class RocksIncrementalSnapshotStrategy<K>
stateUploader.close();
}
- @Nonnull
- private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {
-
- if (localRecoveryConfig.isLocalRecoveryEnabled()) {
- // create a "permanent" snapshot directory for local recovery.
- LocalRecoveryDirectoryProvider directoryProvider =
- localRecoveryConfig
- .getLocalStateDirectoryProvider()
- .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
- File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
-
- if (!directory.exists() && !directory.mkdirs()) {
- throw new IOException(
- "Local state base directory for checkpoint "
- + checkpointId
- + " does not exist and could not be created: "
- + directory);
- }
-
- // introduces an extra directory because RocksDB wants a non-existing directory for
- // native checkpoints.
- // append localDirectoryName here to solve directory collision problem when two stateful
- // operators chained in one task.
- File rdbSnapshotDir = new File(directory, localDirectoryName);
- if (rdbSnapshotDir.exists()) {
- FileUtils.deleteDirectory(rdbSnapshotDir);
- }
-
- Path path = rdbSnapshotDir.toPath();
- // create a "permanent" snapshot directory because local recovery is active.
- try {
- return SnapshotDirectory.permanent(path);
- } catch (IOException ex) {
- try {
- FileUtils.deleteDirectory(directory);
- } catch (IOException delEx) {
- ex = ExceptionUtils.firstOrSuppressed(delEx, ex);
- }
- throw ex;
- }
- } else {
- // create a "temporary" snapshot directory because local recovery is inactive.
- File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
- return SnapshotDirectory.temporary(snapshotDir);
- }
- }
-
- private PreviousSnapshot snapshotMetaData(
+ @Override
+ protected PreviousSnapshot snapshotMetaData(
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
final long lastCompletedCheckpoint;
@@ -325,39 +234,10 @@ public class RocksIncrementalSnapshotStrategy<K>
return new PreviousSnapshot(confirmedSstFiles);
}
- private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
- throws Exception {
- // create hard links of living files in the output path
- try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
- Checkpoint checkpoint = Checkpoint.create(db)) {
- checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
- } catch (Exception ex) {
- try {
- outputDirectory.cleanup();
- } catch (IOException cleanupEx) {
- ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
- }
- throw ex;
- }
- }
-
/**
* Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
*/
- private final class RocksDBIncrementalSnapshotOperation
- implements SnapshotResultSupplier<KeyedStateHandle> {
-
- /** Id for the current checkpoint. */
- private final long checkpointId;
-
- /** Stream factory that creates the output streams to DFS. */
- @Nonnull private final CheckpointStreamFactory checkpointStreamFactory;
-
- /** The state meta data. */
- @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
- /** Local directory for the RocksDB native backup. */
- @Nonnull private final SnapshotDirectory localBackupDirectory;
+ private final class RocksDBIncrementalSnapshotOperation extends RocksDBSnapshotOperation {
/** All sst files that were part of the last previously completed checkpoint. */
@Nonnull private final PreviousSnapshot previousSnapshot;
@@ -372,11 +252,12 @@ public class RocksIncrementalSnapshotStrategy<K>
@Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
- this.checkpointStreamFactory = checkpointStreamFactory;
+ super(
+ checkpointId,
+ checkpointStreamFactory,
+ localBackupDirectory,
+ stateMetaInfoSnapshots);
this.previousSnapshot = previousSnapshot;
- this.checkpointId = checkpointId;
- this.localBackupDirectory = localBackupDirectory;
- this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.sharingFilesStrategy = sharingFilesStrategy;
}
@@ -395,7 +276,12 @@ public class RocksIncrementalSnapshotStrategy<K>
try {
- metaStateHandle = materializeMetaData(snapshotCloseableRegistry);
+ metaStateHandle =
+ materializeMetaData(
+ snapshotCloseableRegistry,
+ stateMetaInfoSnapshots,
+ checkpointId,
+ checkpointStreamFactory);
// Sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
@@ -418,27 +304,16 @@ public class RocksIncrementalSnapshotStrategy<K>
metaStateHandle.getJobManagerOwnedSnapshot(),
checkpointedSize);
- final DirectoryStateHandle directoryStateHandle =
- localBackupDirectory.completeSnapshotAndGetHandle();
- final SnapshotResult<KeyedStateHandle> snapshotResult;
- if (directoryStateHandle != null
- && metaStateHandle.getTaskLocalSnapshot() != null) {
-
- IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
- new IncrementalLocalKeyedStateHandle(
- backendUID,
- checkpointId,
- directoryStateHandle,
- keyGroupRange,
- metaStateHandle.getTaskLocalSnapshot(),
- sstFiles);
-
- snapshotResult =
- SnapshotResult.withLocalState(
- jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
- } else {
- snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
- }
+ Optional<KeyedStateHandle> localSnapshot =
+ getLocalSnapshot(metaStateHandle.getTaskLocalSnapshot(), sstFiles);
+ final SnapshotResult<KeyedStateHandle> snapshotResult =
+ localSnapshot
+ .map(
+ keyedStateHandle ->
+ SnapshotResult.withLocalState(
+ jmIncrementalKeyedStateHandle,
+ keyedStateHandle))
+ .orElseGet(() -> SnapshotResult.of(jmIncrementalKeyedStateHandle));
completed = true;
@@ -450,35 +325,7 @@ public class RocksIncrementalSnapshotStrategy<K>
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
- cleanupIncompleteSnapshot(statesToDiscard);
- }
- }
- }
-
- private long getUploadedStateSize(Collection<StreamStateHandle> streamStateHandles) {
- return streamStateHandles.stream()
- .filter(s -> !(s instanceof PlaceholderStreamStateHandle))
- .mapToLong(StateObject::getStateSize)
- .sum();
- }
-
- private void cleanupIncompleteSnapshot(@Nonnull List<StateObject> statesToDiscard) {
-
- try {
- StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
- } catch (Exception e) {
- LOG.warn("Could not properly discard states.", e);
- }
-
- if (localBackupDirectory.isSnapshotCompleted()) {
- try {
- DirectoryStateHandle directoryStateHandle =
- localBackupDirectory.completeSnapshotAndGetHandle();
- if (directoryStateHandle != null) {
- directoryStateHandle.discardState();
- }
- } catch (Exception e) {
- LOG.warn("Could not properly discard local state.", e);
+ cleanupIncompleteSnapshot(statesToDiscard, localBackupDirectory);
}
}
}
@@ -562,113 +409,5 @@ public class RocksIncrementalSnapshotStrategy<K>
}
}
}
-
- @Nonnull
- private SnapshotResult<StreamStateHandle> materializeMetaData(
- @Nonnull CloseableRegistry snapshotCloseableRegistry) throws Exception {
-
- CheckpointStreamWithResultProvider streamWithResultProvider =
- localRecoveryConfig.isLocalRecoveryEnabled()
- ? CheckpointStreamWithResultProvider.createDuplicatingStream(
- checkpointId,
- CheckpointedStateScope.EXCLUSIVE,
- checkpointStreamFactory,
- localRecoveryConfig
- .getLocalStateDirectoryProvider()
- .orElseThrow(
- LocalRecoveryConfig.localRecoveryNotEnabled()))
- : CheckpointStreamWithResultProvider.createSimpleStream(
- CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
-
- snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
-
- try {
- // no need for compression scheme support because sst-files are already compressed
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- keySerializer, stateMetaInfoSnapshots, false);
-
- DataOutputView out =
- new DataOutputViewStreamWrapper(
- streamWithResultProvider.getCheckpointOutputStream());
-
- serializationProxy.write(out);
-
- if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
- SnapshotResult<StreamStateHandle> result =
- streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
- streamWithResultProvider = null;
- return result;
- } else {
- throw new IOException("Stream already closed and cannot return a handle.");
- }
- } finally {
- if (streamWithResultProvider != null) {
- if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
- IOUtils.closeQuietly(streamWithResultProvider);
- }
- }
- }
- }
- }
-
- static class IncrementalRocksDBSnapshotResources implements SnapshotResources {
- @Nonnull private final SnapshotDirectory snapshotDirectory;
-
- @Nonnull private final PreviousSnapshot previousSnapshot;
-
- @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
- public IncrementalRocksDBSnapshotResources(
- SnapshotDirectory snapshotDirectory,
- PreviousSnapshot previousSnapshot,
- List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
- this.snapshotDirectory = snapshotDirectory;
- this.previousSnapshot = previousSnapshot;
- this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
- }
-
- @Override
- public void release() {
- try {
- if (snapshotDirectory.exists()) {
- LOG.trace(
- "Running cleanup for local RocksDB backup directory {}.",
- snapshotDirectory);
- boolean cleanupOk = snapshotDirectory.cleanup();
-
- if (!cleanupOk) {
- LOG.debug("Could not properly cleanup local RocksDB backup directory.");
- }
- }
- } catch (IOException e) {
- LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
- }
- }
- }
-
- private static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
- new PreviousSnapshot(Collections.emptyMap());
-
- private static class PreviousSnapshot {
-
- @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
-
- private PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) {
- this.confirmedSstFiles = confirmedSstFiles;
- }
-
- private Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) {
- if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) {
- // we introduce a placeholder state handle, that is replaced with the
- // original from the shared state registry (created from a previous checkpoint)
- return Optional.of(
- new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
- } else {
- // Don't use any uploaded but not confirmed handles because they might be deleted
- // (by TM) if the previous checkpoint failed. See FLINK-25395
- return Optional.empty();
- }
- }
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
new file mode 100644
index 00000000000..eafffebe4ca
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotDirectory;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
+
+/**
+ * Snapshot strategy for {@link RocksDBKeyedStateBackend} based on RocksDB's native checkpoints and
+ * creates full snapshots. the difference between savepoint is that sst files will be uploaded
+ * rather than states.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksNativeFullSnapshotStrategy<K>
+ extends RocksDBSnapshotStrategyBase<
+ K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
+
+ private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
+
+ /** The help class used to upload state files. */
+ private final RocksDBStateUploader stateUploader;
+
+ public RocksNativeFullSnapshotStrategy(
+ @Nonnull RocksDB db,
+ @Nonnull ResourceGuard rocksDBResourceGuard,
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnull LocalRecoveryConfig localRecoveryConfig,
+ @Nonnull File instanceBasePath,
+ @Nonnull UUID backendUID,
+ @Nonnull RocksDBStateUploader rocksDBStateUploader) {
+ super(
+ DESCRIPTION,
+ db,
+ rocksDBResourceGuard,
+ keySerializer,
+ kvStateInformation,
+ keyGroupRange,
+ keyGroupPrefixBytes,
+ localRecoveryConfig,
+ instanceBasePath,
+ backendUID);
+ this.stateUploader = rocksDBStateUploader;
+ }
+
+ @Override
+ public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+ NativeRocksDBSnapshotResources snapshotResources,
+ long checkpointId,
+ long timestamp,
+ @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+ @Nonnull CheckpointOptions checkpointOptions) {
+
+ if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
+ return registry -> SnapshotResult.empty();
+ }
+
+ return new RocksDBNativeFullSnapshotOperation(
+ checkpointId,
+ checkpointStreamFactory,
+ snapshotResources.snapshotDirectory,
+ snapshotResources.stateMetaInfoSnapshots);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) {
+ // nothing to do
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long abortedCheckpointId) {
+ // nothing to do
+ }
+
+ @Override
+ protected PreviousSnapshot snapshotMetaData(
+ long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+ for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
+ kvStateInformation.entrySet()) {
+ stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
+ }
+ return EMPTY_PREVIOUS_SNAPSHOT;
+ }
+
+ @Override
+ public void close() {
+ stateUploader.close();
+ }
+
+ /** Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend. */
+ private final class RocksDBNativeFullSnapshotOperation extends RocksDBSnapshotOperation {
+
+ private RocksDBNativeFullSnapshotOperation(
+ long checkpointId,
+ @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+ @Nonnull SnapshotDirectory localBackupDirectory,
+ @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+
+ super(
+ checkpointId,
+ checkpointStreamFactory,
+ localBackupDirectory,
+ stateMetaInfoSnapshots);
+ }
+
+ @Override
+ public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
+ throws Exception {
+
+ boolean completed = false;
+
+ // Handle to the meta data file
+ SnapshotResult<StreamStateHandle> metaStateHandle = null;
+ // Handles to all the files in the current snapshot will go here
+ final Map<StateHandleID, StreamStateHandle> privateFiles = new HashMap<>();
+
+ try {
+
+ metaStateHandle =
+ materializeMetaData(
+ snapshotCloseableRegistry,
+ stateMetaInfoSnapshots,
+ checkpointId,
+ checkpointStreamFactory);
+
+ // Sanity checks - they should never fail
+ Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
+ Preconditions.checkNotNull(
+ metaStateHandle.getJobManagerOwnedSnapshot(),
+ "Metadata for job manager was not properly created.");
+
+ uploadSstFiles(privateFiles, snapshotCloseableRegistry);
+ long checkpointedSize = metaStateHandle.getStateSize();
+ checkpointedSize += getUploadedStateSize(privateFiles.values());
+
+ final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
+ new IncrementalRemoteKeyedStateHandle(
+ backendUID,
+ keyGroupRange,
+ checkpointId,
+ Collections.emptyMap(),
+ privateFiles,
+ metaStateHandle.getJobManagerOwnedSnapshot(),
+ checkpointedSize);
+
+ Optional<KeyedStateHandle> localSnapshot =
+ getLocalSnapshot(
+ metaStateHandle.getTaskLocalSnapshot(), Collections.emptyMap());
+ final SnapshotResult<KeyedStateHandle> snapshotResult =
+ localSnapshot
+ .map(
+ keyedStateHandle ->
+ SnapshotResult.withLocalState(
+ jmIncrementalKeyedStateHandle,
+ keyedStateHandle))
+ .orElseGet(() -> SnapshotResult.of(jmIncrementalKeyedStateHandle));
+
+ completed = true;
+
+ return snapshotResult;
+ } finally {
+ if (!completed) {
+ final List<StateObject> statesToDiscard =
+ new ArrayList<>(1 + privateFiles.size());
+ statesToDiscard.add(metaStateHandle);
+ statesToDiscard.addAll(privateFiles.values());
+ cleanupIncompleteSnapshot(statesToDiscard, localBackupDirectory);
+ }
+ }
+ }
+
+ private void uploadSstFiles(
+ @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles,
+ @Nonnull CloseableRegistry snapshotCloseableRegistry)
+ throws Exception {
+
+ // write state data
+ Preconditions.checkState(localBackupDirectory.exists());
+
+ Map<StateHandleID, Path> privateFilePaths = new HashMap<>();
+
+ Path[] files = localBackupDirectory.listDirectory();
+ if (files != null) {
+ // all sst files are private in full snapshot
+ for (Path filePath : files) {
+ final String fileName = filePath.getFileName().toString();
+ final StateHandleID stateHandleID = new StateHandleID(fileName);
+ privateFilePaths.put(stateHandleID, filePath);
+ }
+
+ privateFiles.putAll(
+ stateUploader.uploadFilesToCheckpointFs(
+ privateFilePaths,
+ checkpointStreamFactory,
+ CheckpointedStateScope.EXCLUSIVE,
+ snapshotCloseableRegistry));
+ }
+ }
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
index cf0692c6777..10425708b4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -18,6 +18,12 @@
package org.apache.flink.contrib.streaming.state.snapshot;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Collection;
+
/**
* Utility methods and constants around RocksDB creating and restoring snapshots for {@link
* org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
@@ -30,4 +36,11 @@ public class RocksSnapshotUtil {
private RocksSnapshotUtil() {
throw new AssertionError();
}
+
+ public static long getUploadedStateSize(Collection<StreamStateHandle> streamStateHandles) {
+ return streamStateHandles.stream()
+ .filter(s -> !(s instanceof PlaceholderStreamStateHandle))
+ .mapToLong(StateObject::getStateSize)
+ .sum();
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index f021cf5306a..46788406c16 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -362,11 +362,6 @@ public class EmbeddedRocksDBStateBackendTest
RocksDB spyDB = keyedStateBackend.db;
- if (!enableIncrementalCheckpointing) {
- verify(spyDB, times(1)).getSnapshot();
- verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
- }
-
// Ensure every RocksObjects not closed yet
for (RocksObject rocksCloseable : allCreatedCloseables) {
verify(rocksCloseable, times(0)).close();
@@ -662,11 +657,6 @@ public class EmbeddedRocksDBStateBackendTest
assertNotNull(null, keyedStateBackend.db);
RocksDB spyDB = keyedStateBackend.db;
- if (!enableIncrementalCheckpointing) {
- verify(spyDB, times(1)).getSnapshot();
- verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
- }
-
keyedStateBackend.dispose();
verify(spyDB, times(1)).close();
assertEquals(true, keyedStateBackend.isDisposed());
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index 927192fa0da..4e4a85aaf66 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -174,7 +174,7 @@ public class RocksIncrementalSnapshotStrategyTest {
CloseableRegistry closeableRegistry)
throws Exception {
- RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources snapshotResources =
+ RocksIncrementalSnapshotStrategy.NativeRocksDBSnapshotResources snapshotResources =
checkpointSnapshotStrategy.syncPrepareResources(checkpointId);
return (IncrementalRemoteKeyedStateHandle)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
index 005752dda96..dddb6eaa2b3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/FullSnapshotRocksDbTtlStateTest.java
@@ -27,4 +27,9 @@ public class FullSnapshotRocksDbTtlStateTest extends RocksDBTtlStateTestBase {
StateBackend createStateBackend() {
return createStateBackend(TernaryBoolean.FALSE);
}
+
+ @Override
+ public boolean isSavepoint() {
+ return false;
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
index 7f30e62c712..0cd83003f9c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/IncSnapshotRocksDbTtlStateTest.java
@@ -29,7 +29,7 @@ public class IncSnapshotRocksDbTtlStateTest extends RocksDBTtlStateTestBase {
}
@Override
- public boolean fullSnapshot() {
+ public boolean isSavepoint() {
return false;
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index a4851cf51d0..13eaf4403ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -231,6 +231,72 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
}
}
+ @Test
+ public void testExternalizedSwitchRocksDBCheckpointsStandalone() throws Exception {
+ final File checkpointDir = temporaryFolder.newFolder();
+ StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+ StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+ testExternalizedCheckpoints(
+ checkpointDir,
+ null,
+ previousStateBackend,
+ newStateBackend,
+ previousStateBackend,
+ false,
+ restoreMode);
+ }
+
+ @Test
+ public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryStandalone()
+ throws Exception {
+ final File checkpointDir = temporaryFolder.newFolder();
+ StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+ StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+ testExternalizedCheckpoints(
+ checkpointDir,
+ null,
+ previousStateBackend,
+ newStateBackend,
+ previousStateBackend,
+ true,
+ restoreMode);
+ }
+
+ @Test
+ public void testExternalizedSwitchRocksDBCheckpointsZookeeper() throws Exception {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
+ final File checkpointDir = temporaryFolder.newFolder();
+ StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+ StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zkServer.getConnectString(),
+ previousStateBackend,
+ newStateBackend,
+ previousStateBackend,
+ false,
+ restoreMode);
+ }
+ }
+
+ @Test
+ public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryZookeeper()
+ throws Exception {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
+ final File checkpointDir = temporaryFolder.newFolder();
+ StateBackend previousStateBackend = createRocksDBStateBackend(checkpointDir, false);
+ StateBackend newStateBackend = createRocksDBStateBackend(checkpointDir, true);
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zkServer.getConnectString(),
+ previousStateBackend,
+ newStateBackend,
+ previousStateBackend,
+ true,
+ restoreMode);
+ }
+ }
+
private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException {
return new FsStateBackend(checkpointDir.toURI().toString(), true);
}
@@ -248,6 +314,25 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
boolean localRecovery,
RestoreMode restoreMode)
throws Exception {
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zooKeeperQuorum,
+ backend,
+ backend,
+ backend,
+ localRecovery,
+ restoreMode);
+ }
+
+ private static void testExternalizedCheckpoints(
+ File checkpointDir,
+ String zooKeeperQuorum,
+ StateBackend backend1,
+ StateBackend backend2,
+ StateBackend backend3,
+ boolean localRecovery,
+ RestoreMode restoreMode)
+ throws Exception {
final Configuration config = new Configuration();
@@ -286,17 +371,17 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
try {
// main test sequence: start job -> eCP -> restore job -> eCP -> restore job
String firstExternalCheckpoint =
- runJobAndGetExternalizedCheckpoint(backend, null, cluster, restoreMode);
+ runJobAndGetExternalizedCheckpoint(backend1, null, cluster, restoreMode);
assertNotNull(firstExternalCheckpoint);
String secondExternalCheckpoint =
runJobAndGetExternalizedCheckpoint(
- backend, firstExternalCheckpoint, cluster, restoreMode);
+ backend2, firstExternalCheckpoint, cluster, restoreMode);
assertNotNull(secondExternalCheckpoint);
String thirdExternalCheckpoint =
runJobAndGetExternalizedCheckpoint(
- backend,
+ backend3,
// in CLAIM mode, the previous run is only guaranteed to preserve the
// latest checkpoint; in NO_CLAIM/LEGACY, even the initial checkpoints
// must remain valid
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
index ff2706f128c..b5b96c9380e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java
@@ -75,6 +75,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class SavepointFormatITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(SavepointFormatITCase.class);
+ private static final String STATE_BACKEND_ROCKSDB = "ROCKSDB";
+ private static final String STATE_BACKEND_HEAP = "HEAP";
+
@TempDir Path checkpointsDir;
@TempDir Path originalSavepointDir;
@TempDir Path renamedSavepointDir;
@@ -122,7 +125,7 @@ public class SavepointFormatITCase extends TestLogger {
private void validateNativeNonChangelogState(
KeyedStateHandle state, StateBackendConfig backendConfig) {
- if (backendConfig.isIncremental()) {
+ if (STATE_BACKEND_ROCKSDB.equals(backendConfig.getName())) {
assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class));
} else {
assertThat(state, instanceOf(KeyGroupsStateHandle.class));
@@ -174,7 +177,7 @@ public class SavepointFormatITCase extends TestLogger {
return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) {
@Override
public String getName() {
- return "HEAP";
+ return STATE_BACKEND_HEAP;
}
@Override
@@ -201,7 +204,7 @@ public class SavepointFormatITCase extends TestLogger {
return new StateBackendConfig(changelogEnabled, incremental) {
@Override
public String getName() {
- return "ROCKSDB";
+ return STATE_BACKEND_ROCKSDB;
}
@Override