You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/21 12:31:36 UTC
[flink] 01/02: [FLINK-10042][state] (part 1) Extract snapshot
algorithms from inner classes of RocksDBKeyedStateBackend into full classes
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aba02eb3fcc4472c3d5f5a0f527960d79c659c31
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Aug 7 15:57:27 2018 +0200
[FLINK-10042][state] (part 1) Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes
---
.../flink/runtime/state/SnapshotStrategy.java | 3 +-
.../runtime/state/heap/HeapKeyedStateBackend.java | 5 +
.../streaming/state/RocksDBKeyedStateBackend.java | 1071 ++------------------
.../state/snapshot/RocksFullSnapshotStrategy.java | 478 +++++++++
.../snapshot/RocksIncrementalSnapshotStrategy.java | 578 +++++++++++
.../state/snapshot/RocksSnapshotUtil.java | 51 +
.../state/snapshot/SnapshotStrategyBase.java | 90 ++
.../streaming/state/RocksDBAsyncSnapshotTest.java | 27 +-
.../streaming/state/RocksDBStateBackendTest.java | 1 +
9 files changed, 1317 insertions(+), 987 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
index 9139fa7..3ad68af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
@@ -28,8 +28,7 @@ import java.util.concurrent.RunnableFuture;
*
* @param <S> type of the returned state object that represents the result of the snapshot operation.
*/
-@FunctionalInterface
-public interface SnapshotStrategy<S extends StateObject> {
+public interface SnapshotStrategy<S extends StateObject> extends CheckpointListener {
/**
* Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bc1e0f5..0e2f16c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -882,6 +882,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // nothing to do.
+ }
}
private interface StateFactory {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c159976..87c7e55 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -35,9 +35,8 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
-import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
-import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
-import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
@@ -47,32 +46,22 @@ import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
@@ -80,14 +69,11 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
-import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -96,25 +82,19 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
-import org.apache.flink.util.function.SupplierWithException;
-import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +105,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
@@ -144,12 +123,16 @@ import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+
/**
* An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
@@ -167,9 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
public static final String MERGE_OPERATOR_NAME = "stringappendtest";
- /** File suffix of sstable files. */
- private static final String SST_FILE_SUFFIX = ".sst";
-
private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
Stream.of(
Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create),
@@ -230,7 +210,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
+ private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
/**
* Map of state names to their corresponding restored state meta info.
@@ -246,20 +226,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** True if incremental checkpointing is enabled. */
private final boolean enableIncrementalCheckpointing;
- /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
- private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
-
- /** The identifier of the last completed checkpoint. */
- private long lastCompletedCheckpointId = -1L;
-
- /** Unique ID of this backend. */
- private UUID backendUID;
-
/** The configuration of local recovery. */
private final LocalRecoveryConfig localRecoveryConfig;
/** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */
- private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
+ private SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
/** Factory for priority queue state. */
private final PriorityQueueSetFactory priorityQueueFactory;
@@ -314,12 +285,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
this.kvStateInformation = new LinkedHashMap<>();
this.restoredKvStateMetaInfos = new HashMap<>();
- this.materializedSstFiles = new TreeMap<>();
- this.backendUID = UUID.randomUUID();
-
- this.snapshotStrategy = enableIncrementalCheckpointing ?
- new IncrementalSnapshotStrategy() :
- new FullSnapshotStrategy();
this.writeOptions = new WriteOptions().setDisableWAL(true);
@@ -333,8 +298,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
default:
throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
}
-
- LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
}
private static void checkAndCreateDirectory(File directory) throws IOException {
@@ -508,41 +471,83 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
restoredKvStateMetaInfos.clear();
try {
+ RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null;
if (restoreState == null || restoreState.isEmpty()) {
createDB();
} else {
KeyedStateHandle firstStateHandle = restoreState.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle
|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
- RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
- restoreOperation.restore(restoreState);
+ incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+ incrementalRestoreOperation.restore(restoreState);
} else {
- RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
- restoreOperation.doRestore(restoreState);
+ RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+ fullRestoreOperation.doRestore(restoreState);
}
}
+
+ initializeSnapshotStrategy(incrementalRestoreOperation);
} catch (Exception ex) {
dispose();
throw ex;
}
}
- @Override
- public void notifyCheckpointComplete(long completedCheckpointId) {
-
- if (!enableIncrementalCheckpointing) {
- return;
- }
-
- synchronized (materializedSstFiles) {
-
- if (completedCheckpointId < lastCompletedCheckpointId) {
- return;
+ @VisibleForTesting
+ void initializeSnapshotStrategy(
+ @Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
+
+ final RocksFullSnapshotStrategy<K> fullSnapshotStrategy =
+ new RocksFullSnapshotStrategy<>(
+ db,
+ rocksDBResourceGuard,
+ keySerializer,
+ kvStateInformation,
+ keyGroupRange,
+ keyGroupPrefixBytes,
+ localRecoveryConfig,
+ cancelStreamRegistry,
+ keyGroupCompressionDecorator);
+
+ if (enableIncrementalCheckpointing) {
+ final UUID backendUID;
+ final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+ final long lastCompletedCheckpointId;
+
+ if (incrementalRestoreOperation == null) {
+ backendUID = UUID.randomUUID();
+ materializedSstFiles = new TreeMap<>();
+ lastCompletedCheckpointId = -1L;
+ } else {
+ backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
+ materializedSstFiles = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
+ lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId();
+ Preconditions.checkState(lastCompletedCheckpointId >= 0L);
}
+ // TODO eventually we might want to separate savepoint and snapshot strategy, i.e. having 2 strategies.
+ this.snapshotStrategy = new RocksIncrementalSnapshotStrategy<>(
+ db,
+ rocksDBResourceGuard,
+ keySerializer,
+ kvStateInformation,
+ keyGroupRange,
+ keyGroupPrefixBytes,
+ localRecoveryConfig,
+ cancelStreamRegistry,
+ instanceBasePath,
+ backendUID,
+ materializedSstFiles,
+ lastCompletedCheckpointId,
+ fullSnapshotStrategy);
+ } else {
+ this.snapshotStrategy = fullSnapshotStrategy;
+ }
+ }
- materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
-
- lastCompletedCheckpointId = completedCheckpointId;
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) throws Exception {
+ if (snapshotStrategy != null) {
+ snapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
}
}
@@ -656,10 +661,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws RocksDBException
*/
private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
@@ -724,9 +725,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
- *
- * @throws IOException
- * @throws RocksDBException
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
@@ -752,14 +750,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ if (hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
- RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+ clearMetaDataFollowsFlag(key);
writeBatchWrapper.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ kvStateId = END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
- if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+ if (END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
@@ -781,9 +779,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static class RocksDBIncrementalRestoreOperation<T> {
private final RocksDBKeyedStateBackend<T> stateBackend;
+ private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+ private UUID restoredBackendUID;
+ private long lastCompletedCheckpointId;
private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+
this.stateBackend = stateBackend;
+ this.restoredSstFiles = new TreeMap<>();
+ }
+
+ SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() {
+ return restoredSstFiles;
+ }
+
+ UUID getRestoredBackendUID() {
+ return restoredBackendUID;
+ }
+
+ long getLastCompletedCheckpointId() {
+ return lastCompletedCheckpointId;
}
/**
@@ -872,6 +887,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
+ this.restoredBackendUID = UUID.randomUUID();
+
initTargetDB(restoreStateHandles, stateBackend.keyGroupRange);
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
@@ -949,6 +966,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Nonnull
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+ private
+
RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -1113,10 +1132,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
// pick up again the old backend id, so the we can reference existing state
- stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
+ this.restoredBackendUID = restoreStateHandle.getBackendIdentifier();
LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
- stateBackend.operatorIdentifier, stateBackend.backendUID);
+ stateBackend.operatorIdentifier, this.restoredBackendUID);
// create hard links in the instance directory
if (!stateBackend.instanceRocksDBPath.mkdirs()) {
@@ -1150,13 +1169,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
// use the restore sst files as the base for succeeding checkpoints
- synchronized (stateBackend.materializedSstFiles) {
- stateBackend.materializedSstFiles.put(
+ restoredSstFiles.put(
restoreStateHandle.getCheckpointId(),
restoreStateHandle.getSharedStateHandleIDs());
- }
- stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
+ lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
/**
@@ -1447,881 +1464,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return count;
}
- private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
- @Override
- public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory primaryStreamFactory,
- CheckpointOptions checkpointOptions) throws Exception {
-
- long startTime = System.currentTimeMillis();
-
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
- timestamp);
- }
-
- return DoneFuture.of(SnapshotResult.empty());
- }
-
- final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
-
- localRecoveryConfig.isLocalRecoveryEnabled() &&
- (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
-
- () -> CheckpointStreamWithResultProvider.createDuplicatingStream(
- checkpointId,
- CheckpointedStateScope.EXCLUSIVE,
- primaryStreamFactory,
- localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
- () -> CheckpointStreamWithResultProvider.createSimpleStream(
- CheckpointedStateScope.EXCLUSIVE,
- primaryStreamFactory);
-
- final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
-
- final RocksDBFullSnapshotOperation<K> snapshotOperation =
- new RocksDBFullSnapshotOperation<>(
- RocksDBKeyedStateBackend.this,
- supplier,
- snapshotCloseableRegistry);
-
- snapshotOperation.takeDBSnapShot();
-
- // implementation of the async IO operation, based on FutureTask
- AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable =
- new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
-
- @Override
- protected void acquireResources() throws Exception {
- cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
- snapshotOperation.openCheckpointStream();
- }
-
- @Override
- protected void releaseResources() throws Exception {
- closeLocalRegistry();
- releaseSnapshotOperationResources();
- }
-
- private void releaseSnapshotOperationResources() {
- // hold the db lock while operation on the db to guard us against async db disposal
- snapshotOperation.releaseSnapshotResources();
- }
-
- @Override
- protected void stopOperation() throws Exception {
- closeLocalRegistry();
- }
-
- private void closeLocalRegistry() {
- if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
- try {
- snapshotCloseableRegistry.close();
- } catch (Exception ex) {
- LOG.warn("Error closing local registry", ex);
- }
- }
- }
-
- @Nonnull
- @Override
- public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
- long startTime = System.currentTimeMillis();
-
- if (isStopped()) {
- throw new IOException("RocksDB closed.");
- }
-
- snapshotOperation.writeDBSnapshot();
-
- LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
- primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
- return snapshotOperation.getSnapshotResultStateHandle();
- }
- };
-
- LOG.debug("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.",
- primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
- return AsyncStoppableTaskWithCallback.from(ioCallable);
- }
- }
-
- private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
- private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
-
- public IncrementalSnapshotStrategy() {
- this.savepointDelegate = new FullSnapshotStrategy();
- }
-
- @Override
- public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
- long checkpointId,
- long checkpointTimestamp,
- CheckpointStreamFactory checkpointStreamFactory,
- CheckpointOptions checkpointOptions) throws Exception {
-
- // for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained.
- if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
- return savepointDelegate.performSnapshot(
- checkpointId,
- checkpointTimestamp,
- checkpointStreamFactory,
- checkpointOptions);
- }
-
- if (db == null) {
- throw new IOException("RocksDB closed.");
- }
-
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
- }
- return DoneFuture.of(SnapshotResult.empty());
- }
-
- SnapshotDirectory snapshotDirectory;
-
- if (localRecoveryConfig.isLocalRecoveryEnabled()) {
- // create a "permanent" snapshot directory for local recovery.
- LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
- File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
-
- if (directory.exists()) {
- FileUtils.deleteDirectory(directory);
- }
-
- if (!directory.mkdirs()) {
- throw new IOException("Local state base directory for checkpoint " + checkpointId +
- " already exists: " + directory);
- }
-
- // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
- File rdbSnapshotDir = new File(directory, "rocks_db");
- Path path = new Path(rdbSnapshotDir.toURI());
- // create a "permanent" snapshot directory because local recovery is active.
- snapshotDirectory = SnapshotDirectory.permanent(path);
- } else {
- // create a "temporary" snapshot directory because local recovery is inactive.
- Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
- snapshotDirectory = SnapshotDirectory.temporary(path);
- }
-
- final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
- new RocksDBIncrementalSnapshotOperation<>(
- RocksDBKeyedStateBackend.this,
- checkpointStreamFactory,
- snapshotDirectory,
- checkpointId);
-
- try {
- snapshotOperation.takeSnapshot();
- } catch (Exception e) {
- snapshotOperation.stop();
- snapshotOperation.releaseResources(true);
- throw e;
- }
-
- return new FutureTask<SnapshotResult<KeyedStateHandle>>(
- snapshotOperation::runSnapshot
- ) {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- snapshotOperation.stop();
- return super.cancel(mayInterruptIfRunning);
- }
-
- @Override
- protected void done() {
- snapshotOperation.releaseResources(isCancelled());
- }
- };
- }
- }
-
- /**
- * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend.
- */
- @VisibleForTesting
- static class RocksDBFullSnapshotOperation<K>
- extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
-
- static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
- static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
- private final RocksDBKeyedStateBackend<K> stateBackend;
- private final KeyGroupRangeOffsets keyGroupRangeOffsets;
- private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
- private final CloseableRegistry snapshotCloseableRegistry;
- private final ResourceGuard.Lease dbLease;
-
- private Snapshot snapshot;
- private ReadOptions readOptions;
-
- /**
- * The state meta data.
- */
- private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
- /**
- * The copied column handle.
- */
- private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta;
-
- private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
-
- private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
- private DataOutputView outputView;
-
- RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
- CloseableRegistry registry) throws IOException {
-
- this.stateBackend = stateBackend;
- this.checkpointStreamSupplier = checkpointStreamSupplier;
- this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
- this.snapshotCloseableRegistry = registry;
- this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
- }
-
- /**
- * 1) Create a snapshot object from RocksDB.
- *
- */
- public void takeDBSnapShot() {
- Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-
- this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
-
- this.copiedMeta = new ArrayList<>(stateBackend.kvStateInformation.size());
-
- for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
- stateBackend.kvStateInformation.values()) {
- // snapshot meta info
- this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
- this.copiedMeta.add(tuple2);
- }
- this.snapshot = stateBackend.db.getSnapshot();
- }
-
- /**
- * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
- *
- * @throws Exception
- */
- public void openCheckpointStream() throws Exception {
- Preconditions.checkArgument(checkpointStreamWithResultProvider == null,
- "Output stream for snapshot is already set.");
-
- checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
- snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
- outputView = new DataOutputViewStreamWrapper(
- checkpointStreamWithResultProvider.getCheckpointOutputStream());
- }
-
- /**
- * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
- *
- * @throws IOException
- */
- public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
-
- if (null == snapshot) {
- throw new IOException("No snapshot available. Might be released due to cancellation.");
- }
-
- Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output stream to write snapshot.");
- writeKVStateMetaData();
- writeKVStateData();
- }
-
- /**
- * 4) Returns a snapshot result for the completed snapshot.
- *
- * @return snapshot result for the completed snapshot.
- */
- @Nonnull
- public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException {
-
- if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
-
- SnapshotResult<StreamStateHandle> res =
- checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
- checkpointStreamWithResultProvider = null;
- return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, keyGroupRangeOffsets);
- }
-
- return SnapshotResult.empty();
- }
-
- /**
- * 5) Release the snapshot object for RocksDB and clean up.
- */
- public void releaseSnapshotResources() {
-
- checkpointStreamWithResultProvider = null;
-
- if (null != kvStateIterators) {
- for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
- IOUtils.closeQuietly(kvStateIterator.f0);
- }
- kvStateIterators = null;
- }
-
- if (null != snapshot) {
- if (null != stateBackend.db) {
- stateBackend.db.releaseSnapshot(snapshot);
- }
- IOUtils.closeQuietly(snapshot);
- snapshot = null;
- }
-
- if (null != readOptions) {
- IOUtils.closeQuietly(readOptions);
- readOptions = null;
- }
-
- this.dbLease.close();
- }
-
- private void writeKVStateMetaData() throws IOException {
-
- this.kvStateIterators = new ArrayList<>(copiedMeta.size());
-
- int kvStateId = 0;
-
- //retrieve iterator for this k/v states
- readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
-
- for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : copiedMeta) {
- RocksIteratorWrapper rocksIteratorWrapper =
- getRocksIterator(stateBackend.db, tuple2.f0, tuple2.f1, readOptions);
- kvStateIterators.add(new Tuple2<>(rocksIteratorWrapper, kvStateId));
- ++kvStateId;
- }
-
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- // TODO: this code assumes that writing a serializer is threadsafe, we should support to
- // get a serialized form already at state registration time in the future
- stateBackend.getKeySerializer(),
- stateMetaInfoSnapshots,
- !Objects.equals(
- UncompressedStreamCompressionDecorator.INSTANCE,
- stateBackend.keyGroupCompressionDecorator));
-
- serializationProxy.write(outputView);
- }
-
- private void writeKVStateData() throws IOException, InterruptedException {
- byte[] previousKey = null;
- byte[] previousValue = null;
- DataOutputView kgOutView = null;
- OutputStream kgOutStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
- checkpointStreamWithResultProvider.getCheckpointOutputStream();
-
- try {
- // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
- try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
- kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
-
- // handover complete, null out to prevent double close
- kvStateIterators = null;
-
- //preamble: setup with first key-group as our lookahead
- if (mergeIterator.isValid()) {
- //begin first key-group by recording the offset
- keyGroupRangeOffsets.setKeyGroupOffset(
- mergeIterator.keyGroup(),
- checkpointOutputStream.getPos());
- //write the k/v-state id as metadata
- kgOutStream = stateBackend.keyGroupCompressionDecorator.
- decorateWithCompression(checkpointOutputStream);
- kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutView.writeShort(mergeIterator.kvStateId());
- previousKey = mergeIterator.key();
- previousValue = mergeIterator.value();
- mergeIterator.next();
- }
-
- //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
- while (mergeIterator.isValid()) {
-
- assert (!hasMetaDataFollowsFlag(previousKey));
-
- //set signal in first key byte that meta data will follow in the stream after this k/v pair
- if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
-
- //be cooperative and check for interruption from time to time in the hot loop
- checkInterrupted();
-
- setMetaDataFollowsFlagInKey(previousKey);
- }
-
- writeKeyValuePair(previousKey, previousValue, kgOutView);
-
- //write meta data if we have to
- if (mergeIterator.isNewKeyGroup()) {
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close the outer stream
- kgOutStream.close();
- //begin new key-group
- keyGroupRangeOffsets.setKeyGroupOffset(
- mergeIterator.keyGroup(),
- checkpointOutputStream.getPos());
- //write the kev-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutStream = stateBackend.keyGroupCompressionDecorator.
- decorateWithCompression(checkpointOutputStream);
- kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
- kgOutView.writeShort(mergeIterator.kvStateId());
- } else if (mergeIterator.isNewKeyValueState()) {
- //write the k/v-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutView.writeShort(mergeIterator.kvStateId());
- }
-
- //request next k/v pair
- previousKey = mergeIterator.key();
- previousValue = mergeIterator.value();
- mergeIterator.next();
- }
- }
-
- //epilogue: write last key-group
- if (previousKey != null) {
- assert (!hasMetaDataFollowsFlag(previousKey));
- setMetaDataFollowsFlagInKey(previousKey);
- writeKeyValuePair(previousKey, previousValue, kgOutView);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close the outer stream
- kgOutStream.close();
- kgOutStream = null;
- }
-
- } finally {
- // this will just close the outer stream
- IOUtils.closeQuietly(kgOutStream);
- }
- }
-
- private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
- BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
- BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
- }
-
- static void setMetaDataFollowsFlagInKey(byte[] key) {
- key[0] |= FIRST_BIT_IN_BYTE_MASK;
- }
-
- static void clearMetaDataFollowsFlag(byte[] key) {
- key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
- }
-
- static boolean hasMetaDataFollowsFlag(byte[] key) {
- return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
- }
-
- private static void checkInterrupted() throws InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("RocksDB snapshot interrupted.");
- }
- }
-
- @Override
- protected void acquireResources() throws Exception {
- stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
- openCheckpointStream();
- }
-
- @Override
- protected void releaseResources() {
- closeLocalRegistry();
- releaseSnapshotOperationResources();
- }
-
- private void releaseSnapshotOperationResources() {
- // hold the db lock while operation on the db to guard us against async db disposal
- releaseSnapshotResources();
- }
-
- @Override
- protected void stopOperation() {
- closeLocalRegistry();
- }
-
- private void closeLocalRegistry() {
- if (stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
- try {
- snapshotCloseableRegistry.close();
- } catch (Exception ex) {
- LOG.warn("Error closing local registry", ex);
- }
- }
- }
-
- @Nonnull
- @Override
- public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
- long startTime = System.currentTimeMillis();
-
- if (isStopped()) {
- throw new IOException("RocksDB closed.");
- }
-
- writeDBSnapshot();
-
- LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
- checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
- return getSnapshotResultStateHandle();
- }
- }
-
- /**
- * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
- */
- private static final class RocksDBIncrementalSnapshotOperation<K> {
-
- /** The backend which we snapshot. */
- private final RocksDBKeyedStateBackend<K> stateBackend;
-
- /** Stream factory that creates the outpus streams to DFS. */
- private final CheckpointStreamFactory checkpointStreamFactory;
-
- /** Id for the current checkpoint. */
- private final long checkpointId;
-
- /** All sst files that were part of the last previously completed checkpoint. */
- private Set<StateHandleID> baseSstFiles;
-
- /** The state meta data. */
- private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
-
- /** Local directory for the RocksDB native backup. */
- private SnapshotDirectory localBackupDirectory;
-
- // Registry for all opened i/o streams
- private final CloseableRegistry closeableRegistry = new CloseableRegistry();
-
- // new sst files since the last completed checkpoint
- private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
-
- // handles to the misc files in the current snapshot
- private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
-
- // This lease protects from concurrent disposal of the native rocksdb instance.
- private final ResourceGuard.Lease dbLease;
-
- private SnapshotResult<StreamStateHandle> metaStateHandle = null;
-
- private RocksDBIncrementalSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- SnapshotDirectory localBackupDirectory,
- long checkpointId) throws IOException {
-
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.checkpointId = checkpointId;
- this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
- this.localBackupDirectory = localBackupDirectory;
- }
-
- private StreamStateHandle materializeStateData(Path filePath) throws Exception {
- FSDataInputStream inputStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
-
- try {
- final byte[] buffer = new byte[8 * 1024];
-
- FileSystem backupFileSystem = localBackupDirectory.getFileSystem();
- inputStream = backupFileSystem.open(filePath);
- closeableRegistry.registerCloseable(inputStream);
-
- outputStream = checkpointStreamFactory
- .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
- closeableRegistry.registerCloseable(outputStream);
-
- while (true) {
- int numBytes = inputStream.read(buffer);
-
- if (numBytes == -1) {
- break;
- }
-
- outputStream.write(buffer, 0, numBytes);
- }
-
- StreamStateHandle result = null;
- if (closeableRegistry.unregisterCloseable(outputStream)) {
- result = outputStream.closeAndGetHandle();
- outputStream = null;
- }
- return result;
-
- } finally {
-
- if (closeableRegistry.unregisterCloseable(inputStream)) {
- inputStream.close();
- }
-
- if (closeableRegistry.unregisterCloseable(outputStream)) {
- outputStream.close();
- }
- }
- }
-
- @Nonnull
- private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
-
- LocalRecoveryConfig localRecoveryConfig = stateBackend.localRecoveryConfig;
-
- CheckpointStreamWithResultProvider streamWithResultProvider =
-
- localRecoveryConfig.isLocalRecoveryEnabled() ?
-
- CheckpointStreamWithResultProvider.createDuplicatingStream(
- checkpointId,
- CheckpointedStateScope.EXCLUSIVE,
- checkpointStreamFactory,
- localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
- CheckpointStreamWithResultProvider.createSimpleStream(
- CheckpointedStateScope.EXCLUSIVE,
- checkpointStreamFactory);
-
- try {
- closeableRegistry.registerCloseable(streamWithResultProvider);
-
- //no need for compression scheme support because sst-files are already compressed
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.keySerializer,
- stateMetaInfoSnapshots,
- false);
-
- DataOutputView out =
- new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
-
- serializationProxy.write(out);
-
- if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
- SnapshotResult<StreamStateHandle> result =
- streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
- streamWithResultProvider = null;
- return result;
- } else {
- throw new IOException("Stream already closed and cannot return a handle.");
- }
- } finally {
- if (streamWithResultProvider != null) {
- if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
- IOUtils.closeQuietly(streamWithResultProvider);
- }
- }
- }
- }
-
- void takeSnapshot() throws Exception {
-
- final long lastCompletedCheckpoint;
-
- // use the last completed checkpoint as the comparison base.
- synchronized (stateBackend.materializedSstFiles) {
- lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
- baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
- }
-
- LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
- "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
- // save meta data
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
- : stateBackend.kvStateInformation.entrySet()) {
- stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
- }
-
- LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory);
-
- if (localBackupDirectory.exists()) {
- throw new IllegalStateException("Unexpected existence of the backup directory.");
- }
-
- // create hard links of living files in the snapshot path
- try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) {
- checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
- }
- }
-
- @Nonnull
- SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
-
- stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
-
- // write meta data
- metaStateHandle = materializeMetaData();
-
- // sanity checks - they should never fail
- Preconditions.checkNotNull(metaStateHandle,
- "Metadata was not properly created.");
- Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
- "Metadata for job manager was not properly created.");
-
- // write state data
- Preconditions.checkState(localBackupDirectory.exists());
-
- FileStatus[] fileStatuses = localBackupDirectory.listStatus();
- if (fileStatuses != null) {
- for (FileStatus fileStatus : fileStatuses) {
- final Path filePath = fileStatus.getPath();
- final String fileName = filePath.getName();
- final StateHandleID stateHandleID = new StateHandleID(fileName);
-
- if (fileName.endsWith(SST_FILE_SUFFIX)) {
- final boolean existsAlready =
- baseSstFiles != null && baseSstFiles.contains(stateHandleID);
-
- if (existsAlready) {
- // we introduce a placeholder state handle, that is replaced with the
- // original from the shared state registry (created from a previous checkpoint)
- sstFiles.put(
- stateHandleID,
- new PlaceholderStreamStateHandle());
- } else {
- sstFiles.put(stateHandleID, materializeStateData(filePath));
- }
- } else {
- StreamStateHandle fileHandle = materializeStateData(filePath);
- miscFiles.put(stateHandleID, fileHandle);
- }
- }
- }
-
- synchronized (stateBackend.materializedSstFiles) {
- stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
- }
-
- IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
- stateBackend.backendUID,
- stateBackend.keyGroupRange,
- checkpointId,
- sstFiles,
- miscFiles,
- metaStateHandle.getJobManagerOwnedSnapshot());
-
- StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot();
- DirectoryStateHandle directoryStateHandle = null;
-
- try {
-
- directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
- } catch (IOException ex) {
-
- Exception collector = ex;
-
- try {
- taskLocalSnapshotMetaDataStateHandle.discardState();
- } catch (Exception discardEx) {
- collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
- }
-
- LOG.warn("Problem with local state snapshot.", collector);
- }
-
- if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
-
- IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
- new IncrementalLocalKeyedStateHandle(
- stateBackend.backendUID,
- checkpointId,
- directoryStateHandle,
- stateBackend.keyGroupRange,
- taskLocalSnapshotMetaDataStateHandle,
- sstFiles.keySet());
- return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
- } else {
- return SnapshotResult.of(jmIncrementalKeyedStateHandle);
- }
- }
-
- void stop() {
-
- if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Could not properly close io streams.", e);
- }
- }
- }
-
- void releaseResources(boolean canceled) {
-
- dbLease.close();
-
- if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Exception on closing registry.", e);
- }
- }
-
- try {
- if (localBackupDirectory.exists()) {
- LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory);
- boolean cleanupOk = localBackupDirectory.cleanup();
-
- if (!cleanupOk) {
- LOG.debug("Could not properly cleanup local RocksDB backup directory.");
- }
- }
- } catch (IOException e) {
- LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
- }
-
- if (canceled) {
- Collection<StateObject> statesToDiscard =
- new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
-
- statesToDiscard.add(metaStateHandle);
- statesToDiscard.addAll(miscFiles.values());
- statesToDiscard.addAll(sstFiles.values());
-
- try {
- StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
- } catch (Exception e) {
- LOG.warn("Could not properly discard states.", e);
- }
-
- if (localBackupDirectory.isSnapshotCompleted()) {
- try {
- DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
- if (directoryStateHandle != null) {
- directoryStateHandle.discardState();
- }
- } catch (Exception e) {
- LOG.warn("Could not properly discard local state.", e);
- }
- }
- }
- }
- }
-
public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
return new RocksIteratorWrapper(db.newIterator());
}
@@ -2332,23 +1474,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
}
- @SuppressWarnings("unchecked")
- private static RocksIteratorWrapper getRocksIterator(
- RocksDB db,
- ColumnFamilyHandle columnFamilyHandle,
- RegisteredStateMetaInfoBase metaInfo,
- ReadOptions readOptions) {
- StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
- if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
- stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>)
- ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer();
- }
- RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
- return stateSnapshotTransformer == null ?
- new RocksIteratorWrapper(rocksIterator) :
- new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
- }
-
/**
* Encapsulates the logic and resources in connection with creating priority queue state structures.
*/
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
new file mode 100644
index 0000000..0cc9729
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * Snapshot strategy to create full snapshots of
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. Iterates and writes all states from a
+ * RocksDB snapshot of the column families.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksFullSnapshotStrategy<K> extends SnapshotStrategyBase<K> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
+
+ /** This decorator is used to apply compression per key-group for the written snapshot data. */
+ @Nonnull
+ private final StreamCompressionDecorator keyGroupCompressionDecorator;
+
+ public RocksFullSnapshotStrategy(
+ @Nonnull RocksDB db,
+ @Nonnull ResourceGuard rocksDBResourceGuard,
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnull LocalRecoveryConfig localRecoveryConfig,
+ @Nonnull CloseableRegistry cancelStreamRegistry,
+ @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
+ super(
+ db,
+ rocksDBResourceGuard,
+ keySerializer,
+ kvStateInformation,
+ keyGroupRange,
+ keyGroupPrefixBytes,
+ localRecoveryConfig,
+ cancelStreamRegistry);
+
+ this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+ }
+
+ @Override
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory primaryStreamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
+
+ long startTime = System.currentTimeMillis();
+
+ if (kvStateInformation.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
+ timestamp);
+ }
+
+ return DoneFuture.of(SnapshotResult.empty());
+ }
+
+ final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
+
+ localRecoveryConfig.isLocalRecoveryEnabled() &&
+ (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
+
+ () -> CheckpointStreamWithResultProvider.createDuplicatingStream(
+ checkpointId,
+ CheckpointedStateScope.EXCLUSIVE,
+ primaryStreamFactory,
+ localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+ () -> CheckpointStreamWithResultProvider.createSimpleStream(
+ CheckpointedStateScope.EXCLUSIVE,
+ primaryStreamFactory);
+
+ final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+
+ final RocksDBFullSnapshotCallable snapshotOperation =
+ new RocksDBFullSnapshotCallable(supplier, snapshotCloseableRegistry);
+
+ return new SnapshotTask(snapshotOperation);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // nothing to do.
+ }
+
+ /**
+ * Wrapping task to run a {@link RocksDBFullSnapshotCallable} and delegate cancellation.
+ */
+ private class SnapshotTask extends FutureTask<SnapshotResult<KeyedStateHandle>> {
+
+ /** Reference to the callable for cancellation. */
+ @Nonnull
+ private final AutoCloseable callableClose;
+
+ SnapshotTask(@Nonnull RocksDBFullSnapshotCallable callable) {
+ super(callable);
+ this.callableClose = callable;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ IOUtils.closeQuietly(callableClose);
+ return super.cancel(mayInterruptIfRunning);
+ }
+ }
+
+ /**
+ * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend.
+ */
+ @VisibleForTesting
+ private class RocksDBFullSnapshotCallable implements Callable<SnapshotResult<KeyedStateHandle>>, AutoCloseable {
+
+ @Nonnull
+ private final KeyGroupRangeOffsets keyGroupRangeOffsets;
+
+ @Nonnull
+ private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
+
+ @Nonnull
+ private final CloseableRegistry snapshotCloseableRegistry;
+
+ @Nonnull
+ private final ResourceGuard.Lease dbLease;
+
+ @Nonnull
+ private final Snapshot snapshot;
+
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * The state meta data.
+ */
+ @Nonnull
+ private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+ /**
+ * The copied column handle.
+ */
+ @Nonnull
+ private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy;
+
+ private final AtomicBoolean ownedForCleanup;
+
+ RocksDBFullSnapshotCallable(
+ @Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
+ @Nonnull CloseableRegistry registry) throws IOException {
+
+ this.ownedForCleanup = new AtomicBoolean(false);
+ this.checkpointStreamSupplier = checkpointStreamSupplier;
+ this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange);
+ this.snapshotCloseableRegistry = registry;
+
+ this.stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
+ this.metaDataCopy = new ArrayList<>(kvStateInformation.size());
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : kvStateInformation.values()) {
+ // snapshot meta info
+ this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+ this.metaDataCopy.add(tuple2);
+ }
+
+ this.dbLease = rocksDBResourceGuard.acquireResource();
+
+ this.readOptions = new ReadOptions();
+ this.snapshot = db.getSnapshot();
+ this.readOptions.setSnapshot(snapshot);
+ }
+
+ @Override
+ public SnapshotResult<KeyedStateHandle> call() throws Exception {
+
+ if (!ownedForCleanup.compareAndSet(false, true)) {
+ throw new CancellationException("Snapshot task was already cancelled, stopping execution.");
+ }
+
+ final long startTime = System.currentTimeMillis();
+ final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<>(metaDataCopy.size());
+
+ try {
+
+ cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
+
+ final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
+ snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
+
+ final DataOutputView outputView =
+ new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream());
+
+ writeKVStateMetaData(kvStateIterators, outputView);
+ writeKVStateData(kvStateIterators, checkpointStreamWithResultProvider);
+
+ final SnapshotResult<KeyedStateHandle> snapshotResult =
+ createStateHandlesFromStreamProvider(checkpointStreamWithResultProvider);
+
+ LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime));
+
+ return snapshotResult;
+
+ } finally {
+
+ for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
+ IOUtils.closeQuietly(kvStateIterator.f0);
+ }
+
+ cleanupSynchronousStepResources();
+ }
+ }
+
+ private void cleanupSynchronousStepResources() {
+ IOUtils.closeQuietly(readOptions);
+
+ db.releaseSnapshot(snapshot);
+ IOUtils.closeQuietly(snapshot);
+
+ IOUtils.closeQuietly(dbLease);
+
+ if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+ try {
+ snapshotCloseableRegistry.close();
+ } catch (Exception ex) {
+ LOG.warn("Error closing local registry", ex);
+ }
+ }
+ }
+
+ private SnapshotResult<KeyedStateHandle> createStateHandlesFromStreamProvider(
+ CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException {
+ if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
+ return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(
+ checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(),
+ keyGroupRangeOffsets);
+ } else {
+ throw new IOException("Snapshot was already closed before completion.");
+ }
+ }
+
+ private void writeKVStateMetaData(
+ final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+ final DataOutputView outputView) throws IOException {
+
+ int kvStateId = 0;
+
+ for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : metaDataCopy) {
+
+ RocksIteratorWrapper rocksIteratorWrapper =
+ getRocksIterator(db, tuple2.f0, tuple2.f1, readOptions);
+
+ kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
+ ++kvStateId;
+ }
+
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(
+ // TODO: this code assumes that writing a serializer is threadsafe, we should support to
+ // get a serialized form already at state registration time in the future
+ keySerializer,
+ stateMetaInfoSnapshots,
+ !Objects.equals(
+ UncompressedStreamCompressionDecorator.INSTANCE,
+ keyGroupCompressionDecorator));
+
+ serializationProxy.write(outputView);
+ }
+
+ private void writeKVStateData(
+ final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+ final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException, InterruptedException {
+
+ byte[] previousKey = null;
+ byte[] previousValue = null;
+ DataOutputView kgOutView = null;
+ OutputStream kgOutStream = null;
+ CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
+ checkpointStreamWithResultProvider.getCheckpointOutputStream();
+
+ try {
+ // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
+ try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
+ kvStateIterators, keyGroupPrefixBytes)) {
+
+ //preamble: setup with first key-group as our lookahead
+ if (mergeIterator.isValid()) {
+ //begin first key-group by recording the offset
+ keyGroupRangeOffsets.setKeyGroupOffset(
+ mergeIterator.keyGroup(),
+ checkpointOutputStream.getPos());
+ //write the k/v-state id as metadata
+ kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+ kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
+ }
+
+ //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+ while (mergeIterator.isValid()) {
+
+ assert (!hasMetaDataFollowsFlag(previousKey));
+
+ //set signal in first key byte that meta data will follow in the stream after this k/v pair
+ if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+
+ //be cooperative and check for interruption from time to time in the hot loop
+ checkInterrupted();
+
+ setMetaDataFollowsFlagInKey(previousKey);
+ }
+
+ writeKeyValuePair(previousKey, previousValue, kgOutView);
+
+ //write meta data if we have to
+ if (mergeIterator.isNewKeyGroup()) {
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+ // this will just close the outer stream
+ kgOutStream.close();
+ //begin new key-group
+ keyGroupRangeOffsets.setKeyGroupOffset(
+ mergeIterator.keyGroup(),
+ checkpointOutputStream.getPos());
+ //write the kev-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+ kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ } else if (mergeIterator.isNewKeyValueState()) {
+ //write the k/v-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ }
+
+ //request next k/v pair
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
+ }
+ }
+
+ //epilogue: write last key-group
+ if (previousKey != null) {
+ assert (!hasMetaDataFollowsFlag(previousKey));
+ setMetaDataFollowsFlagInKey(previousKey);
+ writeKeyValuePair(previousKey, previousValue, kgOutView);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+ // this will just close the outer stream
+ kgOutStream.close();
+ kgOutStream = null;
+ }
+
+ } finally {
+ // this will just close the outer stream
+ IOUtils.closeQuietly(kgOutStream);
+ }
+ }
+
+ private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
+ BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
+ }
+
+ private void checkInterrupted() throws InterruptedException {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("RocksDB snapshot interrupted.");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ if (ownedForCleanup.compareAndSet(false, true)) {
+ cleanupSynchronousStepResources();
+ }
+
+ if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+ snapshotCloseableRegistry.close();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static RocksIteratorWrapper getRocksIterator(
+ RocksDB db,
+ ColumnFamilyHandle columnFamilyHandle,
+ RegisteredStateMetaInfoBase metaInfo,
+ ReadOptions readOptions) {
+ StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
+ if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+ stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>)
+ ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer();
+ }
+ RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
+ return stateSnapshotTransformer == null ?
+ new RocksIteratorWrapper(rocksIterator) :
+ new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
new file mode 100644
index 0000000..3487fe6
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotDirectory;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+
+/**
+ * Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend} that is based
+ * on RocksDB's native checkpoints and creates incremental snapshots.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksIncrementalSnapshotStrategy<K> extends SnapshotStrategyBase<K> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
+
+ /** Base path of the RocksDB instance. */
+ @Nonnull
+ private final File instanceBasePath;
+
+ /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
+ @Nonnull
+ private final UUID backendUID;
+
+ /** Stores the materialized sstable files from all snapshots that build the incremental history. */
+ @Nonnull
+ private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+
+ /** The identifier of the last completed checkpoint. */
+ private long lastCompletedCheckpointId;
+
+ /** We delegate snapshots that are for savepoints to this. */
+ @Nonnull
+ private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
+
+ public RocksIncrementalSnapshotStrategy(
+ @Nonnull RocksDB db,
+ @Nonnull ResourceGuard rocksDBResourceGuard,
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnull LocalRecoveryConfig localRecoveryConfig,
+ @Nonnull CloseableRegistry cancelStreamRegistry,
+ @Nonnull File instanceBasePath,
+ @Nonnull UUID backendUID,
+ @Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles,
+ long lastCompletedCheckpointId,
+ @Nonnull SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate) {
+
+ super(
+ db,
+ rocksDBResourceGuard,
+ keySerializer,
+ kvStateInformation,
+ keyGroupRange,
+ keyGroupPrefixBytes,
+ localRecoveryConfig,
+ cancelStreamRegistry);
+
+ this.instanceBasePath = instanceBasePath;
+ this.backendUID = backendUID;
+ this.materializedSstFiles = materializedSstFiles;
+ this.lastCompletedCheckpointId = lastCompletedCheckpointId;
+ this.savepointDelegate = savepointDelegate;
+ }
+
+ @Override
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+ long checkpointId,
+ long checkpointTimestamp,
+ CheckpointStreamFactory checkpointStreamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
+
+ // for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained.
+ if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
+ return savepointDelegate.performSnapshot(
+ checkpointId,
+ checkpointTimestamp,
+ checkpointStreamFactory,
+ checkpointOptions);
+ }
+
+ if (kvStateInformation.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
+ }
+ return DoneFuture.of(SnapshotResult.empty());
+ }
+
+ SnapshotDirectory snapshotDirectory;
+
+ if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+ // create a "permanent" snapshot directory for local recovery.
+ LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
+ File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+
+ if (directory.exists()) {
+ FileUtils.deleteDirectory(directory);
+ }
+
+ if (!directory.mkdirs()) {
+ throw new IOException("Local state base directory for checkpoint " + checkpointId +
+ " already exists: " + directory);
+ }
+
+ // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
+ File rdbSnapshotDir = new File(directory, "rocks_db");
+ Path path = new Path(rdbSnapshotDir.toURI());
+ // create a "permanent" snapshot directory because local recovery is active.
+ snapshotDirectory = SnapshotDirectory.permanent(path);
+ } else {
+ // create a "temporary" snapshot directory because local recovery is inactive.
+ Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ snapshotDirectory = SnapshotDirectory.temporary(path);
+ }
+
+ final RocksDBIncrementalSnapshotOperation snapshotOperation =
+ new RocksDBIncrementalSnapshotOperation(
+ checkpointStreamFactory,
+ snapshotDirectory,
+ checkpointId);
+
+ try {
+ snapshotOperation.takeSnapshot();
+ } catch (Exception e) {
+ snapshotOperation.stop();
+ snapshotOperation.releaseResources(true);
+ throw e;
+ }
+
+ return new FutureTask<SnapshotResult<KeyedStateHandle>>(
+ snapshotOperation::runSnapshot
+ ) {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ snapshotOperation.stop();
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ protected void done() {
+ snapshotOperation.releaseResources(isCancelled());
+ }
+ };
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) {
+ synchronized (materializedSstFiles) {
+
+ if (completedCheckpointId < lastCompletedCheckpointId) {
+ return;
+ }
+
+ materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
+
+ lastCompletedCheckpointId = completedCheckpointId;
+ }
+ }
+
+ /**
+ * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
+ */
+ private final class RocksDBIncrementalSnapshotOperation {
+
+ /**
+ * Stream factory that creates the outpus streams to DFS.
+ */
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ /**
+ * Id for the current checkpoint.
+ */
+ private final long checkpointId;
+
+ /**
+ * All sst files that were part of the last previously completed checkpoint.
+ */
+ private Set<StateHandleID> baseSstFiles;
+
+ /**
+ * The state meta data.
+ */
+ private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+ /**
+ * Local directory for the RocksDB native backup.
+ */
+ private SnapshotDirectory localBackupDirectory;
+
+ // Registry for all opened i/o streams
+ private final CloseableRegistry closeableRegistry;
+
+ // new sst files since the last completed checkpoint
+ private final Map<StateHandleID, StreamStateHandle> sstFiles;
+
+ // handles to the misc files in the current snapshot
+ private final Map<StateHandleID, StreamStateHandle> miscFiles;
+
+ // This lease protects from concurrent disposal of the native rocksdb instance.
+ private final ResourceGuard.Lease dbLease;
+
+ private SnapshotResult<StreamStateHandle> metaStateHandle;
+
+ private RocksDBIncrementalSnapshotOperation(
+ CheckpointStreamFactory checkpointStreamFactory,
+ SnapshotDirectory localBackupDirectory,
+ long checkpointId) throws IOException {
+
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.checkpointId = checkpointId;
+ this.localBackupDirectory = localBackupDirectory;
+ this.stateMetaInfoSnapshots = new ArrayList<>();
+ this.closeableRegistry = new CloseableRegistry();
+ this.sstFiles = new HashMap<>();
+ this.miscFiles = new HashMap<>();
+ this.metaStateHandle = null;
+ this.dbLease = rocksDBResourceGuard.acquireResource();
+ }
+
+ private StreamStateHandle materializeStateData(Path filePath) throws Exception {
+ FSDataInputStream inputStream = null;
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
+
+ try {
+ final byte[] buffer = new byte[8 * 1024];
+
+ FileSystem backupFileSystem = localBackupDirectory.getFileSystem();
+ inputStream = backupFileSystem.open(filePath);
+ closeableRegistry.registerCloseable(inputStream);
+
+ outputStream = checkpointStreamFactory
+ .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+ closeableRegistry.registerCloseable(outputStream);
+
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+
+ StreamStateHandle result = null;
+ if (closeableRegistry.unregisterCloseable(outputStream)) {
+ result = outputStream.closeAndGetHandle();
+ outputStream = null;
+ }
+ return result;
+
+ } finally {
+
+ if (closeableRegistry.unregisterCloseable(inputStream)) {
+ inputStream.close();
+ }
+
+ if (closeableRegistry.unregisterCloseable(outputStream)) {
+ outputStream.close();
+ }
+ }
+ }
+
+ @Nonnull
+ private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
+
+ CheckpointStreamWithResultProvider streamWithResultProvider =
+
+ localRecoveryConfig.isLocalRecoveryEnabled() ?
+
+ CheckpointStreamWithResultProvider.createDuplicatingStream(
+ checkpointId,
+ CheckpointedStateScope.EXCLUSIVE,
+ checkpointStreamFactory,
+ localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+ CheckpointStreamWithResultProvider.createSimpleStream(
+ CheckpointedStateScope.EXCLUSIVE,
+ checkpointStreamFactory);
+
+ try {
+ closeableRegistry.registerCloseable(streamWithResultProvider);
+
+ //no need for compression scheme support because sst-files are already compressed
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(
+ keySerializer,
+ stateMetaInfoSnapshots,
+ false);
+
+ DataOutputView out =
+ new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
+
+ serializationProxy.write(out);
+
+ if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+ SnapshotResult<StreamStateHandle> result =
+ streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
+ streamWithResultProvider = null;
+ return result;
+ } else {
+ throw new IOException("Stream already closed and cannot return a handle.");
+ }
+ } finally {
+ if (streamWithResultProvider != null) {
+ if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+ IOUtils.closeQuietly(streamWithResultProvider);
+ }
+ }
+ }
+ }
+
+ void takeSnapshot() throws Exception {
+
+ final long lastCompletedCheckpoint;
+
+ // use the last completed checkpoint as the comparison base.
+ synchronized (materializedSstFiles) {
+ lastCompletedCheckpoint = lastCompletedCheckpointId;
+ baseSstFiles = materializedSstFiles.get(lastCompletedCheckpoint);
+ }
+
+ LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+ "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
+ // save meta data
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
+ : kvStateInformation.entrySet()) {
+ stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
+ }
+
+ LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory);
+
+ if (localBackupDirectory.exists()) {
+ throw new IllegalStateException("Unexpected existence of the backup directory.");
+ }
+
+ // create hard links of living files in the snapshot path
+ try (Checkpoint checkpoint = Checkpoint.create(db)) {
+ checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
+ }
+ }
+
+ @Nonnull
+ SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
+
+ cancelStreamRegistry.registerCloseable(closeableRegistry);
+
+ // write meta data
+ metaStateHandle = materializeMetaData();
+
+ // sanity checks - they should never fail
+ Preconditions.checkNotNull(metaStateHandle,
+ "Metadata was not properly created.");
+ Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
+ "Metadata for job manager was not properly created.");
+
+ // write state data
+ Preconditions.checkState(localBackupDirectory.exists());
+
+ FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+ if (fileStatuses != null) {
+ for (FileStatus fileStatus : fileStatuses) {
+ final Path filePath = fileStatus.getPath();
+ final String fileName = filePath.getName();
+ final StateHandleID stateHandleID = new StateHandleID(fileName);
+
+ if (fileName.endsWith(SST_FILE_SUFFIX)) {
+ final boolean existsAlready =
+ baseSstFiles != null && baseSstFiles.contains(stateHandleID);
+
+ if (existsAlready) {
+ // we introduce a placeholder state handle, that is replaced with the
+ // original from the shared state registry (created from a previous checkpoint)
+ sstFiles.put(
+ stateHandleID,
+ new PlaceholderStreamStateHandle());
+ } else {
+ sstFiles.put(stateHandleID, materializeStateData(filePath));
+ }
+ } else {
+ StreamStateHandle fileHandle = materializeStateData(filePath);
+ miscFiles.put(stateHandleID, fileHandle);
+ }
+ }
+ }
+
+ synchronized (materializedSstFiles) {
+ materializedSstFiles.put(checkpointId, sstFiles.keySet());
+ }
+
+ IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
+ backendUID,
+ keyGroupRange,
+ checkpointId,
+ sstFiles,
+ miscFiles,
+ metaStateHandle.getJobManagerOwnedSnapshot());
+
+ StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot();
+ DirectoryStateHandle directoryStateHandle = null;
+
+ try {
+
+ directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
+ } catch (IOException ex) {
+
+ Exception collector = ex;
+
+ try {
+ taskLocalSnapshotMetaDataStateHandle.discardState();
+ } catch (Exception discardEx) {
+ collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
+ }
+
+ LOG.warn("Problem with local state snapshot.", collector);
+ }
+
+ if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
+
+ IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
+ new IncrementalLocalKeyedStateHandle(
+ backendUID,
+ checkpointId,
+ directoryStateHandle,
+ keyGroupRange,
+ taskLocalSnapshotMetaDataStateHandle,
+ sstFiles.keySet());
+ return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
+ } else {
+ return SnapshotResult.of(jmIncrementalKeyedStateHandle);
+ }
+ }
+
+ void stop() {
+
+ if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+ try {
+ closeableRegistry.close();
+ } catch (IOException e) {
+ LOG.warn("Could not properly close io streams.", e);
+ }
+ }
+ }
+
+ void releaseResources(boolean canceled) {
+
+ dbLease.close();
+
+ if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+ try {
+ closeableRegistry.close();
+ } catch (IOException e) {
+ LOG.warn("Exception on closing registry.", e);
+ }
+ }
+
+ try {
+ if (localBackupDirectory.exists()) {
+ LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory);
+ boolean cleanupOk = localBackupDirectory.cleanup();
+
+ if (!cleanupOk) {
+ LOG.debug("Could not properly cleanup local RocksDB backup directory.");
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
+ }
+
+ if (canceled) {
+ Collection<StateObject> statesToDiscard =
+ new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
+
+ statesToDiscard.add(metaStateHandle);
+ statesToDiscard.addAll(miscFiles.values());
+ statesToDiscard.addAll(sstFiles.values());
+
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard states.", e);
+ }
+
+ if (localBackupDirectory.isSnapshotCompleted()) {
+ try {
+ DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
+ if (directoryStateHandle != null) {
+ directoryStateHandle.discardState();
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard local state.", e);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
new file mode 100644
index 0000000..bf2bbdb
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+/**
+ * Utility methods and constants around RocksDB creating and restoring snapshots for
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ */
+public class RocksSnapshotUtil {
+
+ /**
+ * File suffix of sstable files.
+ */
+ public static final String SST_FILE_SUFFIX = ".sst";
+
+ public static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
+
+ public static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
+
+ public static void setMetaDataFollowsFlagInKey(byte[] key) {
+ key[0] |= FIRST_BIT_IN_BYTE_MASK;
+ }
+
+ public static void clearMetaDataFollowsFlag(byte[] key) {
+ key[0] &= (~FIRST_BIT_IN_BYTE_MASK);
+ }
+
+ public static boolean hasMetaDataFollowsFlag(byte[] key) {
+ return 0 != (key[0] & FIRST_BIT_IN_BYTE_MASK);
+ }
+
+ private RocksSnapshotUtil() {
+ throw new AssertionError();
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
new file mode 100644
index 0000000..efebe8c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Base class for {@link SnapshotStrategy} implementations on RocksDB.
+ *
+ * @param <K> type of the backend keys.
+ */
+public abstract class SnapshotStrategyBase<K> implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
+
+ @Nonnull
+ protected final RocksDB db;
+
+ @Nonnull
+ protected final ResourceGuard rocksDBResourceGuard;
+
+ @Nonnull
+ protected final TypeSerializer<K> keySerializer;
+
+ @Nonnull
+ protected final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
+
+ @Nonnull
+ protected final KeyGroupRange keyGroupRange;
+
+ @Nonnegative
+ protected final int keyGroupPrefixBytes;
+
+ @Nonnull
+ protected final LocalRecoveryConfig localRecoveryConfig;
+
+ @Nonnull
+ protected final CloseableRegistry cancelStreamRegistry;
+
+ public SnapshotStrategyBase(
+ @Nonnull RocksDB db,
+ @Nonnull ResourceGuard rocksDBResourceGuard,
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnull LocalRecoveryConfig localRecoveryConfig,
+ @Nonnull CloseableRegistry cancelStreamRegistry) {
+
+ this.db = db;
+ this.rocksDBResourceGuard = rocksDBResourceGuard;
+ this.keySerializer = keySerializer;
+ this.kvStateInformation = kvStateInformation;
+ this.keyGroupRange = keyGroupRange;
+ this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+ this.localRecoveryConfig = localRecoveryConfig;
+ this.cancelStreamRegistry = cancelStreamRegistry;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index e344638..c872553 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -91,6 +91,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.FIRST_BIT_IN_BYTE_MASK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
@@ -425,21 +430,19 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
@Test
public void testConsistentSnapshotSerializationFlagsAndMasks() {
- Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK);
- Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+ Assert.assertEquals(0xFFFF, END_OF_KEY_GROUP_MARK);
+ Assert.assertEquals(0x80, FIRST_BIT_IN_BYTE_MASK);
byte[] expectedKey = new byte[] {42, 42};
byte[] modKey = expectedKey.clone();
- Assert.assertFalse(
- RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+ Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
- RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
- Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+ setMetaDataFollowsFlagInKey(modKey);
+ Assert.assertTrue(hasMetaDataFollowsFlag(modKey));
- RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey);
- Assert.assertFalse(
- RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+ clearMetaDataFollowsFlag(modKey);
+ Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
Assert.assertTrue(Arrays.equals(expectedKey, modKey));
}
@@ -504,12 +507,12 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
@Nullable
@Override
- public StreamStateHandle closeAndGetHandle() throws IOException {
+ public StreamStateHandle closeAndGetHandle() {
throw new UnsupportedOperationException();
}
@Override
- public long getPos() throws IOException {
+ public long getPos() {
throw new UnsupportedOperationException();
}
@@ -529,7 +532,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
}
@Override
- public void close() throws IOException {
+ public void close() {
throw new UnsupportedOperationException();
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 0ea0d3f..4916251 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -191,6 +191,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
allCreatedCloseables = new ArrayList<>();
keyedStateBackend.db = spy(keyedStateBackend.db);
+ keyedStateBackend.initializeSnapshotStrategy(null);
doAnswer(new Answer<Object>() {