You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/24 16:20:58 UTC

[flink] 03/09: [refactor] Remove AbstractRocksDBRestoreOperation

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ed5c1a26f53b9481d5616669c91c0f272bdc949
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 8 16:32:25 2021 +0100

    [refactor] Remove AbstractRocksDBRestoreOperation
    
    So far both the RocksFullSnapshotRestoreOperation and
    RocksIncrementalRestoreOperation extended from
    AbstractRocksDBRestoreOperation in order to share some functions.
    However it required e.g. unnecessary parameters to be passed just to
    fulfill the requirements of the base class. Moreover a base class makes
    it harder to extend classes independently.
    
    This commit changes sharing the common code to use composition instead
    of inheritance.
---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  18 +-
 .../state/restore/RocksDBFullRestoreOperation.java |  57 +++---
 ...sDBRestoreOperation.java => RocksDBHandle.java} | 201 +++++++++++----------
 .../RocksDBIncrementalRestoreOperation.java        | 191 +++++++++++---------
 .../state/restore/RocksDBNoneRestoreOperation.java |  58 +++---
 .../state/restore/RocksDBRestoreOperation.java     |   3 +-
 6 files changed, 261 insertions(+), 267 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 5f6426c..ce90d05 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -21,10 +21,10 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
@@ -250,7 +250,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                 new LinkedHashMap<>();
         RocksDB db = null;
-        AbstractRocksDBRestoreOperation restoreOperation = null;
+        RocksDBRestoreOperation restoreOperation = null;
         RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
                 new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
 
@@ -393,7 +393,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                 writeBatchSize);
     }
 
-    private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
+    private RocksDBRestoreOperation getRocksDBRestoreOperation(
             int keyGroupPrefixBytes,
             CloseableRegistry cancelStreamRegistry,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
@@ -401,20 +401,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         DBOptions dbOptions = optionsContainer.getDbOptions();
         if (restoreStateHandles.isEmpty()) {
             return new RocksDBNoneRestoreOperation<>(
-                    keyGroupRange,
-                    keyGroupPrefixBytes,
-                    numberOfTransferingThreads,
-                    cancelStreamRegistry,
-                    userCodeClassLoader,
                     kvStateInformation,
-                    keySerializerProvider,
-                    instanceBasePath,
                     instanceRocksDBPath,
                     dbOptions,
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
                     metricGroup,
-                    restoreStateHandles,
                     ttlCompactFiltersManager,
                     optionsContainer.getWriteBufferManagerCapacity());
         }
@@ -442,13 +434,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         } else {
             return new RocksDBFullRestoreOperation<>(
                     keyGroupRange,
-                    keyGroupPrefixBytes,
-                    numberOfTransferingThreads,
-                    cancelStreamRegistry,
                     userCodeClassLoader,
                     kvStateInformation,
                     keySerializerProvider,
-                    instanceBasePath,
                     instanceRocksDBPath,
                     dbOptions,
                     columnFamilyOptionsFactory,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 6c98d9b..7b5608a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDb
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -51,23 +50,19 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
-public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
     private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
     /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
     private final long writeBatchSize;
 
+    private final RocksDBHandle rocksHandle;
+
     public RocksDBFullRestoreOperation(
             KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
             ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
             StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@@ -77,25 +72,17 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
-        checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
         this.savepointRestoreOperation =
                 new FullSnapshotRestoreOperation<>(
                         keyGroupRange,
@@ -108,7 +95,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
     @Override
     public RocksDBRestoreResult restore()
             throws IOException, StateMigrationException, RocksDBException {
-        openDB();
+        rocksHandle.openDB();
         try (ThrowingIterator<SavepointRestoreResult> restore =
                 savepointRestoreOperation.restore()) {
             while (restore.hasNext()) {
@@ -116,7 +103,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             }
         }
         return new RocksDBRestoreResult(
-                this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null);
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
+                -1,
+                null,
+                null);
     }
 
     private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
@@ -128,7 +120,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
                         .map(
                                 stateMetaInfoSnapshot -> {
                                     RocksDbKvStateInfo registeredStateCFHandle =
-                                            getOrRegisterStateColumnFamilyHandle(
+                                            this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
                                                     null, stateMetaInfoSnapshot);
                                     return registeredStateCFHandle.columnFamilyHandle;
                                 })
@@ -147,7 +139,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
-                new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
+                new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
             while (keyGroups.hasNext()) {
                 KeyGroup keyGroup = keyGroups.next();
                 try (ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries()) {
@@ -160,4 +152,9 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             }
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
similarity index 55%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index a670dc1..5a37db0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,16 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -49,39 +41,34 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+
 /**
- * Base implementation of RocksDB restore operation.
- *
- * @param <K> The data type that the serializer serializes.
+ * Utility for creating a RocksDB instance either from scratch or from restored local state. This
+ * will also register {@link RocksDbKvStateInfo} when using {@link #openDB(List, List, Path)}.
  */
-public abstract class AbstractRocksDBRestoreOperation<K>
-        implements RocksDBRestoreOperation, AutoCloseable {
+class RocksDBHandle implements AutoCloseable {
+
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected final KeyGroupRange keyGroupRange;
-    protected final int keyGroupPrefixBytes;
-    protected final int numberOfTransferringThreads;
-    protected final CloseableRegistry cancelStreamRegistry;
-    protected final ClassLoader userCodeClassLoader;
-    protected final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
-    protected final DBOptions dbOptions;
-    protected final Map<String, RocksDbKvStateInfo> kvStateInformation;
-    protected final File instanceBasePath;
-    protected final File instanceRocksDBPath;
-    protected final String dbPath;
-    protected List<ColumnFamilyHandle> columnFamilyHandles;
-    protected List<ColumnFamilyDescriptor> columnFamilyDescriptors;
-    protected final StateSerializerProvider<K> keySerializerProvider;
-    protected final RocksDBNativeMetricOptions nativeMetricOptions;
-    protected final MetricGroup metricGroup;
-    protected final Collection<KeyedStateHandle> restoreStateHandles;
+    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
+    private final DBOptions dbOptions;
+    private final Map<String, RocksDbKvStateInfo> kvStateInformation;
+    private final String dbPath;
+    private List<ColumnFamilyHandle> columnFamilyHandles;
+    private List<ColumnFamilyDescriptor> columnFamilyDescriptors;
+    private final RocksDBNativeMetricOptions nativeMetricOptions;
+    private final MetricGroup metricGroup;
     // Current places to set compact filter into column family options:
     // - Incremental restore
     //   - restore with rescaling
@@ -94,46 +81,28 @@ public abstract class AbstractRocksDBRestoreOperation<K>
     // - Full restore
     //   - data ingestion after db open: #getOrRegisterStateColumnFamilyHandle before creating
     // column family
-    protected final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
-
-    protected RocksDB db;
-    protected ColumnFamilyHandle defaultColumnFamilyHandle;
-    protected RocksDBNativeMetricMonitor nativeMetricMonitor;
-    protected boolean isKeySerializerCompatibilityChecked;
-    protected final Long writeBufferManagerCapacity;
-
-    protected AbstractRocksDBRestoreOperation(
-            KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
-            ClassLoader userCodeClassLoader,
+    private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
+
+    private RocksDB db;
+    private ColumnFamilyHandle defaultColumnFamilyHandle;
+    private RocksDBNativeMetricMonitor nativeMetricMonitor;
+    private final Long writeBufferManagerCapacity;
+
+    protected RocksDBHandle(
             Map<String, RocksDbKvStateInfo> kvStateInformation,
-            StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> stateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             Long writeBufferManagerCapacity) {
-        this.keyGroupRange = keyGroupRange;
-        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
-        this.numberOfTransferringThreads = numberOfTransferringThreads;
-        this.cancelStreamRegistry = cancelStreamRegistry;
-        this.userCodeClassLoader = userCodeClassLoader;
         this.kvStateInformation = kvStateInformation;
-        this.keySerializerProvider = keySerializerProvider;
-        this.instanceBasePath = instanceBasePath;
-        this.instanceRocksDBPath = instanceRocksDBPath;
         this.dbPath = instanceRocksDBPath.getAbsolutePath();
         this.dbOptions = dbOptions;
         this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
         this.nativeMetricOptions = nativeMetricOptions;
         this.metricGroup = metricGroup;
-        this.restoreStateHandles = stateHandles;
         this.ttlCompactFiltersManager = ttlCompactFiltersManager;
         this.columnFamilyHandles = new ArrayList<>(1);
         this.columnFamilyDescriptors = Collections.emptyList();
@@ -141,6 +110,26 @@ public abstract class AbstractRocksDBRestoreOperation<K>
     }
 
     void openDB() throws IOException {
+        loadDb();
+    }
+
+    void openDB(
+            @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+            @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            @Nonnull Path restoreSourcePath)
+            throws IOException {
+        this.columnFamilyDescriptors = columnFamilyDescriptors;
+        this.columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
+        restoreInstanceDirectoryFromPath(restoreSourcePath);
+        loadDb();
+        // Register CF handlers
+        for (int i = 0; i < stateMetaInfoSnapshots.size(); i++) {
+            getOrRegisterStateColumnFamilyHandle(
+                    columnFamilyHandles.get(i), stateMetaInfoSnapshots.get(i));
+        }
+    }
+
+    private void loadDb() throws IOException {
         db =
                 RocksDBOperationUtils.openDB(
                         dbPath,
@@ -158,10 +147,6 @@ public abstract class AbstractRocksDBRestoreOperation<K>
                         : null;
     }
 
-    public RocksDB getDb() {
-        return this.db;
-    }
-
     RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
             ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) {
 
@@ -199,51 +184,71 @@ public abstract class AbstractRocksDBRestoreOperation<K>
         return registeredStateMetaInfoEntry;
     }
 
-    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
-            throws IOException, StateMigrationException {
-        // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
-        // deserialization of state happens lazily during runtime; we depend on the fact
-        // that the new serializer for states could be compatible, and therefore the restore can
-        // continue
-        // without old serializers required to be present.
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                new KeyedBackendSerializationProxy<>(userCodeClassLoader);
-        serializationProxy.read(dataInputView);
-        if (!isKeySerializerCompatibilityChecked) {
-            // fetch current serializer now because if it is incompatible, we can't access
-            // it anymore to improve the error message
-            TypeSerializer<K> currentSerializer = keySerializerProvider.currentSchemaSerializer();
-            // check for key serializer compatibility; this also reconfigures the
-            // key serializer to be compatible, if it is required and is possible
-            TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
-                    keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(
-                            serializationProxy.getKeySerializerSnapshot());
-            if (keySerializerSchemaCompat.isCompatibleAfterMigration()
-                    || keySerializerSchemaCompat.isIncompatible()) {
-                throw new StateMigrationException(
-                        "The new key serializer ("
-                                + currentSerializer
-                                + ") must be compatible with the previous key serializer ("
-                                + keySerializerProvider.previousSchemaSerializer()
-                                + ").");
-            }
+    /**
+     * This recreates the new working directory of the recovered RocksDB instance and links/copies
+     * the contents from a local state.
+     */
+    private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
+        final Path instanceRocksDBDirectory = Paths.get(dbPath);
+        final Path[] files = FileUtils.listDirectory(source);
+
+        if (!new File(dbPath).mkdirs()) {
+            String errMsg = "Could not create RocksDB data directory: " + dbPath;
+            logger.error(errMsg);
+            throw new IOException(errMsg);
+        }
 
-            isKeySerializerCompatibilityChecked = true;
+        for (Path file : files) {
+            final String fileName = file.getFileName().toString();
+            final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
+            if (fileName.endsWith(SST_FILE_SUFFIX)) {
+                // hardlink'ing the immutable sst-files.
+                Files.createLink(targetFile, file);
+            } else {
+                // true copy for all other files.
+                Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
+            }
         }
+    }
 
-        return serializationProxy;
+    public RocksDB getDb() {
+        return db;
+    }
+
+    public RocksDBNativeMetricMonitor getNativeMetricMonitor() {
+        return nativeMetricMonitor;
+    }
+
+    public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
+        return defaultColumnFamilyHandle;
+    }
+
+    public List<ColumnFamilyHandle> getColumnFamilyHandles() {
+        return columnFamilyHandles;
+    }
+
+    public RocksDbTtlCompactFiltersManager getTtlCompactFiltersManager() {
+        return ttlCompactFiltersManager;
+    }
+
+    public Long getWriteBufferManagerCapacity() {
+        return writeBufferManagerCapacity;
+    }
+
+    public Function<String, ColumnFamilyOptions> getColumnFamilyOptionsFactory() {
+        return columnFamilyOptionsFactory;
+    }
+
+    public DBOptions getDbOptions() {
+        return dbOptions;
     }
 
-    /** Necessary clean up iff restore operation failed. */
     @Override
-    public void close() {
+    public void close() throws Exception {
         IOUtils.closeQuietly(defaultColumnFamilyHandle);
         IOUtils.closeQuietly(nativeMetricMonitor);
         IOUtils.closeQuietly(db);
         // Making sure the already created column family options will be closed
         columnFamilyDescriptors.forEach((cfd) -> IOUtils.closeQuietly(cfd.getOptions()));
     }
-
-    @Override
-    public abstract RocksDBRestoreResult restore() throws Exception;
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 6b66853..7caf934d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
@@ -46,6 +48,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -54,6 +57,8 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -61,10 +66,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -76,19 +78,31 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Function;
 
-import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
 import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
-import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Encapsulates the process of restoring a RocksDB instance from an incremental snapshot. */
-public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOperation {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
     private final String operatorIdentifier;
     private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+    private final RocksDBHandle rocksHandle;
+    private final Collection<KeyedStateHandle> restoreStateHandles;
+    private final CloseableRegistry cancelStreamRegistry;
+    private final KeyGroupRange keyGroupRange;
+    private final File instanceBasePath;
+    private final int numberOfTransferringThreads;
+    private final int keyGroupPrefixBytes;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final ClassLoader userCodeClassLoader;
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
 
+    private boolean isKeySerializerCompatibilityChecked;
+
     public RocksDBIncrementalRestoreOperation(
             String operatorIdentifier,
             KeyGroupRange keyGroupRange,
@@ -108,29 +122,29 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
         this.operatorIdentifier = operatorIdentifier;
         this.restoredSstFiles = new TreeMap<>();
         this.lastCompletedCheckpointId = -1L;
         this.backendUID = UUID.randomUUID();
-        checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.restoreStateHandles = restoreStateHandles;
+        this.cancelStreamRegistry = cancelStreamRegistry;
+        this.keyGroupRange = keyGroupRange;
+        this.instanceBasePath = instanceBasePath;
+        this.numberOfTransferringThreads = numberOfTransferringThreads;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keySerializerProvider = keySerializerProvider;
+        this.userCodeClassLoader = userCodeClassLoader;
     }
 
     /** Root method that branches for different implementations of {@link KeyedStateHandle}. */
@@ -153,9 +167,9 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             restoreWithoutRescaling(theFirstStateHandle);
         }
         return new RocksDBRestoreResult(
-                this.db,
-                defaultColumnFamilyHandle,
-                nativeMetricMonitor,
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
                 lastCompletedCheckpointId,
                 backendUID,
                 restoredSstFiles);
@@ -216,10 +230,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 readMetaData(localKeyedStateHandle.getMetaDataState());
         List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                 serializationProxy.getStateMetaInfoSnapshots();
-        columnFamilyDescriptors =
-                createAndRegisterColumnFamilyDescriptors(
-                        stateMetaInfoSnapshots, true, writeBufferManagerCapacity);
-        columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
 
         Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
 
@@ -228,19 +238,10 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 operatorIdentifier,
                 backendUID);
 
-        if (!instanceRocksDBPath.mkdirs()) {
-            String errMsg =
-                    "Could not create RocksDB data directory: "
-                            + instanceBasePath.getAbsolutePath();
-            logger.error(errMsg);
-            throw new IOException(errMsg);
-        }
-
-        restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
-
-        openDB();
-
-        registerColumnFamilyHandles(stateMetaInfoSnapshots);
+        this.rocksHandle.openDB(
+                createlumnFamilyDescriptors(stateMetaInfoSnapshots, true),
+                stateMetaInfoSnapshots,
+                restoreSourcePath);
     }
 
     private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
@@ -273,14 +274,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         }
     }
 
-    private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) {
-        // Register CF handlers
-        for (int i = 0; i < metaInfoSnapshots.size(); ++i) {
-            getOrRegisterStateColumnFamilyHandle(
-                    columnFamilyHandles.get(i), metaInfoSnapshots.get(i));
-        }
-    }
-
     /**
      * Recovery from multi incremental states with rescaling. For rescaling, this method creates a
      * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance
@@ -299,7 +292,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             restoreStateHandles.remove(initialHandle);
             initDBWithRescaling(initialHandle);
         } else {
-            openDB();
+            this.rocksHandle.openDB();
         }
 
         // Transfer remaining key-groups from temporary instance into base DB
@@ -330,7 +323,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                                     (IncrementalRemoteKeyedStateHandle) rawStateHandle,
                                     temporaryRestoreInstancePath);
                     RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(this.db, writeBatchSize)) {
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) {
 
                 List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
                         tmpRestoreDBInfo.columnFamilyDescriptors;
@@ -343,7 +337,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                     ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
 
                     ColumnFamilyHandle targetColumnFamilyHandle =
-                            getOrRegisterStateColumnFamilyHandle(
+                            this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
                                             null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
                                     .columnFamilyHandle;
 
@@ -390,8 +384,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         // 2. Clip the base DB instance
         try {
             RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
-                    db,
-                    columnFamilyHandles,
+                    this.rocksHandle.getDb(),
+                    this.rocksHandle.getColumnFamilyHandles(),
                     keyGroupRange,
                     initialHandle.getKeyGroupRange(),
                     keyGroupPrefixBytes,
@@ -463,8 +457,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 serializationProxy.getStateMetaInfoSnapshots();
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                createAndRegisterColumnFamilyDescriptors(
-                        stateMetaInfoSnapshots, false, writeBufferManagerCapacity);
+                createlumnFamilyDescriptors(stateMetaInfoSnapshots, false);
 
         List<ColumnFamilyHandle> columnFamilyHandles =
                 new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
@@ -475,8 +468,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                         columnFamilyDescriptors,
                         columnFamilyHandles,
                         RocksDBOperationUtils.createColumnFamilyOptions(
-                                columnFamilyOptionsFactory, "default"),
-                        dbOptions);
+                                this.rocksHandle.getColumnFamilyOptionsFactory(), "default"),
+                        this.rocksHandle.getDbOptions());
 
         return new RestoredDBInstance(
                 restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
@@ -486,10 +479,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
      * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state
      * meta data snapshot.
      */
-    private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
-            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
-            boolean registerTtlCompactFilter,
-            Long writeBufferManagerCapacity) {
+    private List<ColumnFamilyDescriptor> createlumnFamilyDescriptors(
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                 new ArrayList<>(stateMetaInfoSnapshots.size());
@@ -500,37 +491,17 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             ColumnFamilyDescriptor columnFamilyDescriptor =
                     RocksDBOperationUtils.createColumnFamilyDescriptor(
                             metaInfoBase,
-                            columnFamilyOptionsFactory,
-                            registerTtlCompactFilter ? ttlCompactFiltersManager : null,
-                            writeBufferManagerCapacity);
+                            this.rocksHandle.getColumnFamilyOptionsFactory(),
+                            registerTtlCompactFilter
+                                    ? this.rocksHandle.getTtlCompactFiltersManager()
+                                    : null,
+                            this.rocksHandle.getWriteBufferManagerCapacity());
 
             columnFamilyDescriptors.add(columnFamilyDescriptor);
         }
         return columnFamilyDescriptors;
     }
 
-    /**
-     * This recreates the new working directory of the recovered RocksDB instance and links/copies
-     * the contents from a local state.
-     */
-    private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath)
-            throws IOException {
-        final Path instanceRocksDBDirectory = Paths.get(instanceRocksDBPath);
-        final Path[] files = FileUtils.listDirectory(source);
-
-        for (Path file : files) {
-            final String fileName = file.getFileName().toString();
-            final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
-            if (fileName.endsWith(SST_FILE_SUFFIX)) {
-                // hardlink'ing the immutable sst-files.
-                Files.createLink(targetFile, file);
-            } else {
-                // true copy for all other files.
-                Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
-            }
-        }
-    }
-
     /** Reads Flink's state meta data file from the state handle. */
     private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle)
             throws Exception {
@@ -548,4 +519,44 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             }
         }
     }
+
+    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
+            throws IOException, StateMigrationException {
+        // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
+        // deserialization of state happens lazily during runtime; we depend on the fact
+        // that the new serializer for states could be compatible, and therefore the restore can
+        // continue
+        // without old serializers required to be present.
+        KeyedBackendSerializationProxy<K> serializationProxy =
+                new KeyedBackendSerializationProxy<>(userCodeClassLoader);
+        serializationProxy.read(dataInputView);
+        if (!isKeySerializerCompatibilityChecked) {
+            // fetch current serializer now because if it is incompatible, we can't access
+            // it anymore to improve the error message
+            TypeSerializer<K> currentSerializer = keySerializerProvider.currentSchemaSerializer();
+            // check for key serializer compatibility; this also reconfigures the
+            // key serializer to be compatible, if it is required and is possible
+            TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
+                    keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(
+                            serializationProxy.getKeySerializerSnapshot());
+            if (keySerializerSchemaCompat.isCompatibleAfterMigration()
+                    || keySerializerSchemaCompat.isIncompatible()) {
+                throw new StateMigrationException(
+                        "The new key serializer ("
+                                + currentSerializer
+                                + ") must be compatible with the previous key serializer ("
+                                + keySerializerProvider.previousSchemaSerializer()
+                                + ").");
+            }
+
+            isKeySerializerCompatibilityChecked = true;
+        }
+
+        return serializationProxy;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
index c50e553..4202c89 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
@@ -21,11 +21,7 @@ package org.apache.flink.contrib.streaming.state.restore;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StateSerializerProvider;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -33,52 +29,48 @@ import org.rocksdb.DBOptions;
 import javax.annotation.Nonnull;
 
 import java.io.File;
-import java.util.Collection;
 import java.util.Map;
 import java.util.function.Function;
 
 /** Encapsulates the process of initiating a RocksDB instance without restore. */
-public class RocksDBNoneRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBNoneRestoreOperation<K> implements RocksDBRestoreOperation {
+    private final RocksDBHandle rocksHandle;
+
     public RocksDBNoneRestoreOperation(
-            KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
-            ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
-            StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
     }
 
     @Override
     public RocksDBRestoreResult restore() throws Exception {
-        openDB();
+        this.rocksHandle.openDB();
         return new RocksDBRestoreResult(
-                this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null);
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
+                -1,
+                null,
+                null);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
index 56d14ea..b70df73 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
@@ -21,7 +21,8 @@ package org.apache.flink.contrib.streaming.state.restore;
 import org.apache.flink.runtime.state.RestoreOperation;
 
 /** Interface for RocksDB restore. */
-public interface RocksDBRestoreOperation extends RestoreOperation<RocksDBRestoreResult> {
+public interface RocksDBRestoreOperation
+        extends RestoreOperation<RocksDBRestoreResult>, AutoCloseable {
     /** Restores state that was previously snapshot-ed from the provided state handles. */
     RocksDBRestoreResult restore() throws Exception;
 }