You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/24 16:20:55 UTC

[flink] branch master updated (9b84132 -> f519346)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9b84132  [hotfix][docs] reintroduce build_docs.sh script
     new 2f16bff  [hotfix] Remove unnecessary if in RocksIncrementalSnapshotStrategy
     new 7f3aa39  [hotfix] Cleanup raw types around PriorityQueueSetFactory
     new 3ed5c1a  [refactor] Remove AbstractRocksDBRestoreOperation
     new f5fbb64  [refactor] Extract common interface for a single Rocks state
     new be628c6  [hotfix] Fix RocksIncrementalCheckpointRescalingTest
     new a9fef44  [FLINK-21344] Handle heap timers in Rocks state
     new 8006618  [FLINK-21344] Do not store heap timers in raw operator state for a savepoint
     new 6ad54a5  [hotfix] Fix possible null pointer exception in RocksStatesPerKeyGroupMergeIterator
     new f519346  [FLINK-21344] Test legacy heap timers

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/state/AbstractKeyedStateBackend.java   |   3 +-
 .../runtime/state/HeapPriorityQueuesManager.java   | 110 +++++++++
 .../runtime/state/PriorityQueueSetFactory.java     |   2 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  75 +-----
 .../state/heap/HeapMetaInfoRestoreOperation.java   |   5 +-
 .../state/heap/HeapPriorityQueueSetFactory.java    |   2 +-
 .../HeapPriorityQueueSnapshotRestoreWrapper.java   |   5 +-
 .../state/heap/HeapPriorityQueueStateSnapshot.java |   5 +
 .../state/heap/HeapSavepointRestoreOperation.java  |   6 +-
 .../state/ttl/mock/MockKeyedStateBackend.java      |   5 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  34 ++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  55 +++--
 .../state/RocksDBPriorityQueueSetFactory.java      |   2 +-
 .../state/iterator/RocksQueueIterator.java         | 141 ++++++++++++
 .../state/iterator/RocksSingleStateIterator.java   |  29 ++-
 .../RocksStatesPerKeyGroupMergeIterator.java       |  66 +++---
 .../state/iterator/SingleStateIterator.java        |  19 +-
 .../state/restore/RocksDBFullRestoreOperation.java |  85 +++----
 ...sDBRestoreOperation.java => RocksDBHandle.java} | 201 ++++++++--------
 .../RocksDBHeapTimersFullRestoreOperation.java     | 255 +++++++++++++++++++++
 .../RocksDBIncrementalRestoreOperation.java        | 191 +++++++--------
 .../state/restore/RocksDBNoneRestoreOperation.java |  58 ++---
 .../state/restore/RocksDBRestoreOperation.java     |   3 +-
 .../snapshot/RocksDBFullSnapshotResources.java     |  26 ++-
 .../state/snapshot/RocksFullSnapshotStrategy.java  |  17 ++
 .../snapshot/RocksIncrementalSnapshotStrategy.java |   8 +-
 .../state/HeapTimersSnapshottingTest.java          | 103 +++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  11 +-
 .../RocksIncrementalCheckpointRescalingTest.java   |  42 ----
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../api/operators/InternalTimeServiceManager.java  |  12 +-
 .../operators/InternalTimeServiceManagerImpl.java  |  25 +-
 .../api/operators/StreamOperatorStateHandler.java  |   9 +-
 .../BatchExecutionInternalTimeServiceManager.java  |   5 -
 .../state/BatchExecutionKeyedStateBackend.java     |   4 +-
 .../util/AbstractStreamOperatorTestHarness.java    |  16 +-
 .../flink/table/runtime/util/StateConfigUtil.java  |   3 +-
 .../test/checkpointing/TimersSavepointITCase.java  | 229 ++++++++++++++++++
 .../flink/test/state/BackendSwitchSpecs.java       |  16 +-
 .../RocksSavepointStateBackendSwitchTest.java      |  22 +-
 .../_metadata                                      | Bin 0 -> 5391 bytes
 41 files changed, 1395 insertions(+), 516 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapPriorityQueuesManager.java
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksQueueIterator.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java => flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java (71%)
 rename flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/{AbstractRocksDBRestoreOperation.java => RocksDBHandle.java} (55%)
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java
 create mode 100644 flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata


[flink] 02/09: [hotfix] Cleanup raw types around PriorityQueueSetFactory

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7f3aa390892bed6e00ab254e311f6a46c623a1d5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 2 17:37:54 2021 +0100

    [hotfix] Cleanup raw types around PriorityQueueSetFactory
---
 .../org/apache/flink/runtime/state/PriorityQueueSetFactory.java   | 2 +-
 .../apache/flink/runtime/state/heap/HeapKeyedStateBackend.java    | 8 ++++----
 .../flink/runtime/state/heap/HeapPriorityQueueSetFactory.java     | 2 +-
 .../flink/runtime/state/ttl/mock/MockKeyedStateBackend.java       | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java   | 2 +-
 .../contrib/streaming/state/RocksDBPriorityQueueSetFactory.java   | 2 +-
 .../operators/sorted/state/BatchExecutionKeyedStateBackend.java   | 4 ++--
 7 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index baeb591..96ce98b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -36,7 +36,7 @@ public interface PriorityQueueSetFactory {
      * @return the queue with the specified unique name.
      */
     @Nonnull
-    <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0b42a32..8e6c356 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -157,13 +157,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     @SuppressWarnings("unchecked")
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
 
-        final HeapPriorityQueueSnapshotRestoreWrapper existingState =
-                registeredPQStates.get(stateName);
+        final HeapPriorityQueueSnapshotRestoreWrapper<T> existingState =
+                (HeapPriorityQueueSnapshotRestoreWrapper<T>) registeredPQStates.get(stateName);
 
         if (existingState != null) {
             // TODO we implement the simple way of supporting the current functionality, mimicking
@@ -197,7 +197,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     }
 
     @Nonnull
-    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> createInternal(
                     RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
index 8074c1a..6646d5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -50,7 +50,7 @@ public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             HeapPriorityQueueSet<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index d3d3757..c946365 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -278,7 +278,7 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d6c7cff..8aab8b8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -455,7 +455,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 717c2b8..fb063a7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -98,7 +98,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
index 0bf5a5a..128abb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
@@ -244,8 +244,8 @@ class BatchExecutionKeyedStateBackend<K> implements CheckpointableKeyedStateBack
 
     @Nonnull
     @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    @SuppressWarnings({"unchecked"})
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {


[flink] 07/09: [FLINK-21344] Do not store heap timers in raw operator state for a savepoint

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80066185648a243853b532350393ce93952f49b3
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 8 18:56:27 2021 +0100

    [FLINK-21344] Do not store heap timers in raw operator state for a
    savepoint
    
    We do no longer serialize the heap timers in RocksDB state backend when
    taking a savepoint. We still do it for checkpoints though.
    
    There is one gotcha in the PR, that the StateConfigUtil#isStateImmutableInStateBackend
    assumes the knowledge that checkpoints behave differently for heap
    timers than savepoints.
    
    This closes #14913
---
 .../runtime/state/AbstractKeyedStateBackend.java   |   3 +-
 .../state/ttl/mock/MockKeyedStateBackend.java      |   3 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |   6 +-
 .../state/HeapTimersSnapshottingTest.java          | 103 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  11 ++-
 .../api/operators/InternalTimeServiceManager.java  |  12 +--
 .../operators/InternalTimeServiceManagerImpl.java  |  25 +----
 .../api/operators/StreamOperatorStateHandler.java  |   9 +-
 .../BatchExecutionInternalTimeServiceManager.java  |   5 -
 .../util/AbstractStreamOperatorTestHarness.java    |  16 +++-
 .../flink/table/runtime/util/StateConfigUtil.java  |   3 +-
 11 files changed, 148 insertions(+), 48 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1ded0dc..6ba970a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -348,7 +349,7 @@ public abstract class AbstractKeyedStateBackend<K>
     }
 
     // TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
-    public boolean requiresLegacySynchronousTimerSnapshots() {
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointOptions) {
         return false;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index c946365..d995ba3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -181,7 +182,7 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots() {
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointOptions) {
         return false;
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0f53955..9dbc5a6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -853,8 +854,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots() {
-        return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointType) {
+        return priorityQueueFactory instanceof HeapPriorityQueueSetFactory
+                && checkpointType == CheckpointType.CHECKPOINT;
     }
 
     /** Rocks DB specific information about the k/v states. */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..94d82fd
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * The tests verify that {@link PriorityQueueStateType#HEAP heap timers} are not serialized into raw
+ * keyed operator state when taking a savepoint, but they are serialized for checkpoints. The heap
+ * timers still need to be serialized into the raw operator state because of RocksDB incremental
+ * checkpoints.
+ */
+public class HeapTimersSnapshottingTest {
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testNotSerializingTimersInRawStateForSavepoints() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            OperatorSubtaskState state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+            assertThat(state.getRawKeyedState().isEmpty(), equalTo(true));
+        }
+    }
+
+    @Test
+    public void testSerializingTimersInRawStateForCheckpoints() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            OperatorSubtaskState state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.CHECKPOINT)
+                            .getJobManagerOwnedState();
+            assertThat(state.getRawKeyedState().isEmpty(), equalTo(false));
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness()
+            throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(
+                        new KeyedProcessFunction<Integer, Integer, Integer>() {
+                            @Override
+                            public void processElement(
+                                    Integer value, Context ctx, Collector<Integer> out) {
+                                ctx.timerService().registerEventTimeTimer(0L);
+                            }
+                        }),
+                (KeySelector<Integer, Integer>) value -> value,
+                BasicTypeInfo.INT_TYPE_INFO,
+                1,
+                1,
+                0);
+    }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index db7dd71..7702f55 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -45,6 +45,15 @@ public final class RocksDBTestUtils {
     public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
             File instanceBasePath, TypeSerializer<K> keySerializer) {
 
+        return builderForTestDefaults(
+                instanceBasePath, keySerializer, RocksDBStateBackend.PriorityQueueStateType.HEAP);
+    }
+
+    public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
+            File instanceBasePath,
+            TypeSerializer<K> keySerializer,
+            RocksDBStateBackend.PriorityQueueStateType queueStateType) {
+
         final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();
 
         return new RocksDBKeyedStateBackendBuilder<>(
@@ -59,7 +68,7 @@ public final class RocksDBTestUtils {
                 new KeyGroupRange(0, 1),
                 new ExecutionConfig(),
                 TestLocalRecoveryConfig.disabled(),
-                RocksDBStateBackend.PriorityQueueStateType.HEAP,
+                queueStateType,
                 TtlTimeProvider.DEFAULT,
                 new UnregisteredMetricsGroup(),
                 Collections.emptyList(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 6f89b02..e374fdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -56,8 +56,7 @@ public interface InternalTimeServiceManager<K> {
     void advanceWatermark(Watermark watermark) throws Exception;
 
     /**
-     * Snapshots the timers to raw keyed state. This should only be called iff {@link
-     * #isUsingLegacyRawKeyedStateSnapshots()} returns {@code true}.
+     * Snapshots the timers to raw keyed state.
      *
      * <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB
      * incremental snapshots.
@@ -67,15 +66,6 @@ public interface InternalTimeServiceManager<K> {
             throws Exception;
 
     /**
-     * Flag indicating whether or not the internal timer services should be checkpointed with legacy
-     * synchronous snapshots.
-     *
-     * <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB
-     * incremental snapshots.
-     */
-    boolean isUsingLegacyRawKeyedStateSnapshots();
-
-    /**
      * A provider pattern for creating an instance of a {@link InternalTimeServiceManager}. Allows
      * substituting the manager that will be used at the runtime.
      */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
index 14e9d4f..9288e94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -45,7 +44,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An entity keeping all the time-related services. Right now, this is only a {@link
@@ -75,20 +73,16 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan
 
     private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
 
-    private final boolean useLegacySynchronousSnapshots;
-
     private InternalTimeServiceManagerImpl(
             KeyGroupRange localKeyGroupRange,
             KeyContext keyContext,
             PriorityQueueSetFactory priorityQueueSetFactory,
-            ProcessingTimeService processingTimeService,
-            boolean useLegacySynchronousSnapshots) {
+            ProcessingTimeService processingTimeService) {
 
         this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
         this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
         this.keyContext = Preconditions.checkNotNull(keyContext);
         this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
-        this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
 
         this.timerServices = new HashMap<>();
     }
@@ -106,18 +100,10 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
             throws Exception {
         final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
-        final boolean requiresSnapshotLegacyTimers =
-                keyedStatedBackend instanceof AbstractKeyedStateBackend
-                        && ((AbstractKeyedStateBackend<K>) keyedStatedBackend)
-                                .requiresLegacySynchronousTimerSnapshots();
 
         final InternalTimeServiceManagerImpl<K> timeServiceManager =
                 new InternalTimeServiceManagerImpl<>(
-                        keyGroupRange,
-                        keyContext,
-                        keyedStatedBackend,
-                        processingTimeService,
-                        requiresSnapshotLegacyTimers);
+                        keyGroupRange, keyContext, keyedStatedBackend, processingTimeService);
 
         // and then initialize the timer services
         for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
@@ -198,15 +184,8 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan
     //////////////////				Fault Tolerance Methods				///////////////////
 
     @Override
-    public boolean isUsingLegacyRawKeyedStateSnapshots() {
-        return useLegacySynchronousSnapshots;
-    }
-
-    @Override
     public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, String operatorName)
             throws Exception {
-        checkState(useLegacySynchronousSnapshots);
-
         try {
             KeyGroupsList allKeyGroups = out.getKeyGroupList();
             for (int keyGroupIdx : allKeyGroups) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index b784382..f45b00c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -194,7 +195,13 @@ public class StreamOperatorStateHandler {
                         "keyedStateBackend should be available with timeServiceManager");
                 final InternalTimeServiceManager<?> manager = timeServiceManager.get();
 
-                if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
+                boolean requiresLegacyRawKeyedStateSnapshots =
+                        keyedStateBackend instanceof AbstractKeyedStateBackend
+                                && ((AbstractKeyedStateBackend<?>) keyedStateBackend)
+                                        .requiresLegacySynchronousTimerSnapshots(
+                                                checkpointOptions.getCheckpointType());
+
+                if (requiresLegacyRawKeyedStateSnapshots) {
                     checkState(
                             !isUsingCustomRawKeyedState,
                             "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
index 6338fd4..8666f05 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
@@ -83,11 +83,6 @@ public class BatchExecutionInternalTimeServiceManager<K>
         throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
     }
 
-    @Override
-    public boolean isUsingLegacyRawKeyedStateSnapshots() {
-        throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
-    }
-
     public static <K> InternalTimeServiceManager<K> create(
             CheckpointableKeyedStateBackend<K> keyedStatedBackend,
             ClassLoader userClassloader,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index d82ed51..be0e6b3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -654,14 +655,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
      */
     public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp)
             throws Exception {
+        return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
+    }
+
+    /**
+     * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+     * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+     */
+    public OperatorSnapshotFinalizer snapshotWithLocalState(
+            long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception {
 
+        CheckpointStorageLocationReference locationReference =
+                CheckpointStorageLocationReference.getDefault();
         OperatorSnapshotFutures operatorStateResult =
                 operator.snapshotState(
                         checkpointId,
                         timestamp,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
+                        new CheckpointOptions(checkpointType, locationReference),
                         checkpointStorageAccess.resolveCheckpointStorageLocation(
-                                checkpointId, CheckpointStorageLocationReference.getDefault()));
+                                checkpointId, locationReference));
 
         return new OperatorSnapshotFinalizer(operatorStateResult);
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
index 8be9d55..79e9a63 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 
@@ -56,7 +57,7 @@ public class StateConfigUtil {
             // indicates the underlying uses heap-bsased timer
             isHeapTimer =
                     ((AbstractKeyedStateBackend<?>) stateBackend)
-                            .requiresLegacySynchronousTimerSnapshots();
+                            .requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);
         }
         return isRocksDbState && !isHeapTimer;
     }


[flink] 08/09: [hotfix] Fix possible null pointer exception in RocksStatesPerKeyGroupMergeIterator

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ad54a5352ba4f30cd94f4f97b53f51da834109e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 9 18:47:30 2021 +0100

    [hotfix] Fix possible null pointer exception in RocksStatesPerKeyGroupMergeIterator
---
 .../streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
index ed8cc0d..2f062e5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -236,6 +236,8 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
     public void close() {
         IOUtils.closeQuietly(closeableRegistry);
 
-        heap.clear();
+        if (heap != null) {
+            heap.clear();
+        }
     }
 }


[flink] 09/09: [FLINK-21344] Test legacy heap timers

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5193466c5c9b92b52e3b4e81d0dffe27d351b34
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 9 19:57:30 2021 +0100

    [FLINK-21344] Test legacy heap timers
---
 .../test/checkpointing/TimersSavepointITCase.java  | 229 +++++++++++++++++++++
 .../_metadata                                      | Bin 0 -> 5391 bytes
 2 files changed, 229 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java
new file mode 100644
index 0000000..d1d9e83
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for restoring {@link PriorityQueueStateType#HEAP} timers stored in raw operator state. */
+public class TimersSavepointITCase {
+    private static final int PARALLELISM = 4;
+
+    private static final OneShotLatch savepointLatch = new OneShotLatch();
+    private static final OneShotLatch resultLatch = new OneShotLatch();
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+    // We use a single past Flink version as we verify heap timers stored in raw state
+    // Starting from 1.13 we do not store heap timers in raw state, but we keep them in
+    // managed state
+    public static final String SAVEPOINT_FILE_NAME = "legacy-raw-state-heap-timers-rocks-db-1.12";
+
+    /**
+     * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
+     * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
+     * checking functions.
+     */
+    public enum ExecutionMode {
+        PERFORM_SAVEPOINT,
+        VERIFY_SAVEPOINT
+    }
+
+    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // master.
+    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    @Test(timeout = 60_000)
+    public void testSavepointWithTimers() throws Exception {
+        try (ClusterClient<?> client = miniClusterResource.getClusterClient()) {
+            if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
+                takeSavepoint("src/test/resources/" + SAVEPOINT_FILE_NAME, client);
+            } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) {
+                verifySavepoint(getResourceFilename(SAVEPOINT_FILE_NAME), client);
+            } else {
+                throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
+            }
+        }
+    }
+
+    private void verifySavepoint(String savepointPath, ClusterClient<?> client)
+            throws IOException, InterruptedException, java.util.concurrent.ExecutionException {
+        JobGraph jobGraph;
+
+        jobGraph = getJobGraph(PriorityQueueStateType.HEAP);
+        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+        client.submitJob(jobGraph).get();
+        resultLatch.await();
+    }
+
+    private void takeSavepoint(String savepointPath, ClusterClient<?> client) throws Exception {
+        JobGraph jobGraph = getJobGraph(PriorityQueueStateType.ROCKSDB);
+        client.submitJob(jobGraph).get();
+        savepointLatch.await();
+        CompletableFuture<String> savepointPathFuture =
+                client.triggerSavepoint(jobGraph.getJobID(), null);
+
+        String jobmanagerSavepointPath = savepointPathFuture.get(2, TimeUnit.SECONDS);
+
+        File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+        // savepoints were changed to be directories in Flink 1.3
+        FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+    }
+
+    public JobGraph getJobGraph(PriorityQueueStateType priorityQueueStateType) throws IOException {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.addSource(new Source())
+                .assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                .withTimestampAssigner((i, p) -> i))
+                .keyBy(i -> i)
+                .process(new TimersProcessFunction())
+                .addSink(new DiscardingSink<>());
+
+        final Configuration config = new Configuration();
+        config.set(CheckpointingOptions.STATE_BACKEND, "rocksdb");
+        config.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                TMP_FOLDER.newFolder().toURI().toString());
+        config.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY,
+                TMP_FOLDER.newFolder().toURI().toString());
+        config.set(RocksDBOptions.TIMER_SERVICE_FACTORY, priorityQueueStateType);
+        env.configure(config, this.getClass().getClassLoader());
+        return env.getStreamGraph("Test", false).getJobGraph();
+    }
+
+    private static String getResourceFilename(String filename) {
+        ClassLoader cl = TimersSavepointITCase.class.getClassLoader();
+        URL resource = cl.getResource(filename);
+        if (resource == null) {
+            throw new NullPointerException("Missing snapshot resource.");
+        }
+        return resource.getFile();
+    }
+
+    private static class Source implements SourceFunction<Integer>, CheckpointedFunction {
+
+        private volatile boolean running = true;
+        private int emittedCount;
+        private ListState<Integer> state;
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    if (emittedCount == 0) {
+                        ctx.collect(0);
+                        emittedCount = 1;
+                    } else if (emittedCount == 1) {
+                        ctx.collect(emittedCount);
+                    } else {
+                        ctx.collect(emittedCount++);
+                    }
+                }
+                Thread.sleep(1);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            this.running = false;
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            state.add(emittedCount);
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            state =
+                    context.getOperatorStateStore()
+                            .getListState(
+                                    new ListStateDescriptor<>(
+                                            "emittedCount", IntSerializer.INSTANCE));
+            if (context.isRestored()) {
+                this.emittedCount = 2;
+            }
+        }
+    }
+
+    private static class TimersProcessFunction
+            extends KeyedProcessFunction<Integer, Integer, Integer> {
+
+        @Override
+        public void processElement(Integer value, Context ctx, Collector<Integer> out)
+                throws Exception {
+            if (value == 0) {
+                ctx.timerService().registerEventTimeTimer(2L);
+                savepointLatch.trigger();
+            }
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out)
+                throws Exception {
+            out.collect(1);
+            resultLatch.trigger();
+        }
+    }
+}
diff --git a/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata b/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata
new file mode 100644
index 0000000..007c518
Binary files /dev/null and b/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata differ


[flink] 06/09: [FLINK-21344] Handle heap timers in Rocks state

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9fef44654b0c154af573f5c27398e27d3351cf9
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 8 17:09:19 2021 +0100

    [FLINK-21344] Handle heap timers in Rocks state
    
    We serialize the heap timers into the same format as if they were
    actually stored in RocksDB instead of storing them in a raw operator
    state. It lets users change between using heap and RocksDB timers.
---
 .../runtime/state/HeapPriorityQueuesManager.java   | 110 +++++++++
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  73 +-----
 .../state/heap/HeapMetaInfoRestoreOperation.java   |   5 +-
 .../HeapPriorityQueueSnapshotRestoreWrapper.java   |   5 +-
 .../state/heap/HeapPriorityQueueStateSnapshot.java |   5 +
 .../state/heap/HeapSavepointRestoreOperation.java  |   6 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  26 ++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  37 ++-
 .../state/iterator/RocksQueueIterator.java         | 141 ++++++++++++
 .../RocksStatesPerKeyGroupMergeIterator.java       |  23 +-
 .../state/restore/RocksDBFullRestoreOperation.java |  30 +--
 .../RocksDBHeapTimersFullRestoreOperation.java     | 255 +++++++++++++++++++++
 .../snapshot/RocksDBFullSnapshotResources.java     |  26 ++-
 .../state/snapshot/RocksFullSnapshotStrategy.java  |  17 ++
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../flink/test/state/BackendSwitchSpecs.java       |  16 +-
 .../RocksSavepointStateBackendSwitchTest.java      |  22 +-
 17 files changed, 696 insertions(+), 107 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapPriorityQueuesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapPriorityQueuesManager.java
new file mode 100644
index 0000000..27d500d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapPriorityQueuesManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StateMigrationException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+
+/** Manages creating heap priority queues along with their counterpart meta info. */
+@Internal
+public class HeapPriorityQueuesManager {
+
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
+    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
+    private final KeyGroupRange keyGroupRange;
+    private final int numberOfKeyGroups;
+
+    public HeapPriorityQueuesManager(
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            HeapPriorityQueueSetFactory priorityQueueSetFactory,
+            KeyGroupRange keyGroupRange,
+            int numberOfKeyGroups) {
+        this.registeredPQStates = registeredPQStates;
+        this.priorityQueueSetFactory = priorityQueueSetFactory;
+        this.keyGroupRange = keyGroupRange;
+        this.numberOfKeyGroups = numberOfKeyGroups;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nonnull
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createOrUpdate(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+        final HeapPriorityQueueSnapshotRestoreWrapper<T> existingState =
+                (HeapPriorityQueueSnapshotRestoreWrapper<T>) registeredPQStates.get(stateName);
+
+        if (existingState != null) {
+            TypeSerializerSchemaCompatibility<T> compatibilityResult =
+                    existingState
+                            .getMetaInfo()
+                            .updateElementSerializer(byteOrderedElementSerializer);
+
+            if (compatibilityResult.isIncompatible()) {
+                throw new FlinkRuntimeException(
+                        new StateMigrationException(
+                                "For heap backends, the new priority queue serializer must not be incompatible."));
+            } else {
+                registeredPQStates.put(
+                        stateName,
+                        existingState.forUpdatedSerializer(byteOrderedElementSerializer));
+            }
+
+            return existingState.getPriorityQueue();
+        } else {
+            final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+                    new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                            stateName, byteOrderedElementSerializer);
+            return createInternal(metaInfo);
+        }
+    }
+
+    @Nonnull
+    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createInternal(
+                    RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+
+        final String stateName = metaInfo.getName();
+        final HeapPriorityQueueSet<T> priorityQueue =
+                priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
+
+        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
+                new HeapPriorityQueueSnapshotRestoreWrapper<>(
+                        priorityQueue,
+                        metaInfo,
+                        KeyExtractorFunction.forKeyedObjects(),
+                        keyGroupRange,
+                        numberOfKeyGroups);
+
+        registeredPQStates.put(stateName, wrapper);
+        return priorityQueue;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8e6c356..c728fea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateFunction;
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
 import org.apache.flink.runtime.state.StateSnapshotRestore;
@@ -96,9 +95,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     /** Map of registered Key/Value states. */
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
 
-    /** Map of registered priority queue set states. */
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
-
     /** The configuration for local recovery. */
     private final LocalRecoveryConfig localRecoveryConfig;
 
@@ -113,7 +109,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     private final StateTableFactory<K> stateTableFactory;
 
     /** Factory for state that is organized as priority queue. */
-    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
+    private final HeapPriorityQueuesManager priorityQueuesManager;
 
     public HeapKeyedStateBackend(
             TaskKvStateRegistry kvStateRegistry,
@@ -141,12 +137,16 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
                 keyGroupCompressionDecorator,
                 keyContext);
         this.registeredKVStates = registeredKVStates;
-        this.registeredPQStates = registeredPQStates;
         this.localRecoveryConfig = localRecoveryConfig;
-        this.priorityQueueSetFactory = priorityQueueSetFactory;
         this.checkpointStrategyRunner = checkpointStrategyRunner;
         this.savepointStrategyRunner = savepointStrategyRunner;
         this.stateTableFactory = stateTableFactory;
+        this.priorityQueuesManager =
+                new HeapPriorityQueuesManager(
+                        registeredPQStates,
+                        priorityQueueSetFactory,
+                        keyContext.getKeyGroupRange(),
+                        keyContext.getNumberOfKeyGroups());
         LOG.info("Initializing heap keyed state backend with stream factory.");
     }
 
@@ -154,67 +154,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     //  state backend operations
     // ------------------------------------------------------------------------
 
-    @SuppressWarnings("unchecked")
     @Nonnull
     @Override
     public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
-
-        final HeapPriorityQueueSnapshotRestoreWrapper<T> existingState =
-                (HeapPriorityQueueSnapshotRestoreWrapper<T>) registeredPQStates.get(stateName);
-
-        if (existingState != null) {
-            // TODO we implement the simple way of supporting the current functionality, mimicking
-            // keyed state
-            // because this should be reworked in FLINK-9376 and then we should have a common
-            // algorithm over
-            // StateMetaInfoSnapshot that avoids this code duplication.
-
-            TypeSerializerSchemaCompatibility<T> compatibilityResult =
-                    existingState
-                            .getMetaInfo()
-                            .updateElementSerializer(byteOrderedElementSerializer);
-
-            if (compatibilityResult.isIncompatible()) {
-                throw new FlinkRuntimeException(
-                        new StateMigrationException(
-                                "For heap backends, the new priority queue serializer must not be incompatible."));
-            } else {
-                registeredPQStates.put(
-                        stateName,
-                        existingState.forUpdatedSerializer(byteOrderedElementSerializer));
-            }
-
-            return existingState.getPriorityQueue();
-        } else {
-            final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
-                    new RegisteredPriorityQueueStateBackendMetaInfo<>(
-                            stateName, byteOrderedElementSerializer);
-            return createInternal(metaInfo);
-        }
-    }
-
-    @Nonnull
-    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
-            KeyGroupedInternalPriorityQueue<T> createInternal(
-                    RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
-
-        final String stateName = metaInfo.getName();
-        final HeapPriorityQueueSet<T> priorityQueue =
-                priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer());
-
-        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
-                new HeapPriorityQueueSnapshotRestoreWrapper<>(
-                        priorityQueue,
-                        metaInfo,
-                        KeyExtractorFunction.forKeyedObjects(),
-                        keyGroupRange,
-                        numberOfKeyGroups);
-
-        registeredPQStates.put(stateName, wrapper);
-        return priorityQueue;
+        return priorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer);
     }
 
     private <N, V> StateTable<K, N, V> tryRegisterStateTable(
@@ -409,7 +355,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
     /** Returns the total number of state entries across all keys/namespaces. */
     @VisibleForTesting
-    @SuppressWarnings("unchecked")
     @Override
     public int numKeyValueStateEntries() {
         int sum = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
index aecc44b..8badfd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java
@@ -115,9 +115,10 @@ class HeapMetaInfoRestoreOperation<K> {
         return kvStatesById;
     }
 
-    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal(
-                    RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+                    RegisteredPriorityQueueStateBackendMetaInfo metaInfo) {
 
         final String stateName = metaInfo.getName();
         final HeapPriorityQueueSet<T> priorityQueue =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
index 8b44c72..8564c15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupPartitioner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
 import org.apache.flink.runtime.state.StateSnapshotRestore;
 
@@ -61,10 +60,10 @@ public class HeapPriorityQueueSnapshotRestoreWrapper<T extends HeapPriorityQueue
     @SuppressWarnings("unchecked")
     @Nonnull
     @Override
-    public StateSnapshot stateSnapshot() {
+    public HeapPriorityQueueStateSnapshot<T> stateSnapshot() {
         final T[] queueDump =
                 (T[]) priorityQueue.toArray(new HeapPriorityQueueElement[priorityQueue.size()]);
-        return new HeapPriorityQueueStateSnapshot<>(
+        return new HeapPriorityQueueStateSnapshot<T>(
                 queueDump,
                 keyExtractorFunction,
                 metaInfo.deepCopy(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
index e183085..fb597cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
@@ -117,6 +117,11 @@ public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
         return metaInfo.snapshot();
     }
 
+    @Nonnull
+    public RegisteredPriorityQueueStateBackendMetaInfo<T> getMetaInfo() {
+        return metaInfo;
+    }
+
     @Override
     public void release() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
index 13fdf8f..d52b709 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java
@@ -164,8 +164,8 @@ public class HeapSavepointRestoreOperation<K> implements RestoreOperation<Void>
     @SuppressWarnings("unchecked")
     private void readPriorityQueue(KeyGroupEntry groupEntry, StateMetaInfoSnapshot infoSnapshot)
             throws IOException {
-        DataInputDeserializer keyDeserializer = new DataInputDeserializer(groupEntry.getKey());
-        keyDeserializer.skipBytesToRead(keyGroupPrefixBytes);
+        entryKeyDeserializer.setBuffer(groupEntry.getKey());
+        entryKeyDeserializer.skipBytesToRead(keyGroupPrefixBytes);
         HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>
                 priorityQueueSnapshotRestoreWrapper =
                         (HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>)
@@ -174,7 +174,7 @@ public class HeapSavepointRestoreOperation<K> implements RestoreOperation<Void>
                 priorityQueueSnapshotRestoreWrapper
                         .getMetaInfo()
                         .getElementSerializer()
-                        .deserialize(keyDeserializer);
+                        .deserialize(entryKeyDeserializer);
         HeapPriorityQueueSet<HeapPriorityQueueElement> priorityQueue =
                 priorityQueueSnapshotRestoreWrapper.getPriorityQueue();
         priorityQueue.add(timer);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8aab8b8..0f53955 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -58,6 +59,7 @@ import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FileUtils;
@@ -96,7 +98,6 @@ import java.util.stream.StreamSupport;
 
 import static org.apache.flink.contrib.streaming.state.RocksDBSnapshotTransformFactoryAdaptor.wrapStateSnapshotTransformFactory;
 import static org.apache.flink.runtime.state.SnapshotStrategyRunner.ExecutionType.ASYNCHRONOUS;
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -182,6 +183,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
      */
     private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
 
+    private final HeapPriorityQueuesManager heapPriorityQueuesManager;
+
     /** Number of bytes required to prefix the key groups. */
     private final int keyGroupPrefixBytes;
 
@@ -240,6 +243,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             TtlTimeProvider ttlTimeProvider,
             RocksDB db,
             LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             int keyGroupPrefixBytes,
             CloseableRegistry cancelStreamRegistry,
             StreamCompressionDecorator keyGroupCompressionDecorator,
@@ -279,7 +283,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
         this.writeOptions = optionsContainer.getWriteOptions();
         this.readOptions = optionsContainer.getReadOptions();
-        checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
         this.writeBatchSize = writeBatchSize;
         this.db = db;
         this.rocksDBResourceGuard = rocksDBResourceGuard;
@@ -290,6 +293,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
         this.nativeMetricMonitor = nativeMetricMonitor;
         this.sharedRocksKeyBuilder = sharedRocksKeyBuilder;
         this.priorityQueueFactory = priorityQueueFactory;
+        if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) {
+            this.heapPriorityQueuesManager =
+                    new HeapPriorityQueuesManager(
+                            registeredPQStates,
+                            (HeapPriorityQueueSetFactory) priorityQueueFactory,
+                            keyContext.getKeyGroupRange(),
+                            keyContext.getNumberOfKeyGroups());
+        } else {
+            this.heapPriorityQueuesManager = null;
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -459,7 +472,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
-        return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
+        if (this.heapPriorityQueuesManager != null) {
+            return this.heapPriorityQueuesManager.createOrUpdate(
+                    stateName, byteOrderedElementSerializer);
+        } else {
+            return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
+        }
     }
 
     private void cleanInstanceBasePath() {
@@ -640,7 +658,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
                     TypeSerializer<SV> stateSerializer)
                     throws Exception {
 
-        @SuppressWarnings("unchecked")
         RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1;
 
         // fetch current serializer now because if it is incompatible, we can't access
@@ -814,7 +831,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     }
 
     @VisibleForTesting
-    @SuppressWarnings("unchecked")
     @Override
     public int numKeyValueStateEntries() {
         int count = 0;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index ce90d05..382a185 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.RocksDBHeapTimersFullRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -249,6 +251,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
         LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                 new LinkedHashMap<>();
+        LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
+                new LinkedHashMap<>();
         RocksDB db = null;
         RocksDBRestoreOperation restoreOperation = null;
         RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
@@ -262,6 +266,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         int keyGroupPrefixBytes =
                 CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
                         numberOfKeyGroups);
+
         try {
             // Variables for snapshot strategy when incremental checkpoint is enabled
             UUID backendUID = UUID.randomUUID();
@@ -282,6 +287,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                                 keyGroupPrefixBytes,
                                 cancelStreamRegistry,
                                 kvStateInformation,
+                                registeredPQStates,
                                 ttlCompactFiltersManager);
                 RocksDBRestoreResult restoreResult = restoreOperation.restore();
                 db = restoreResult.getDb();
@@ -297,6 +303,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
             writeBatchWrapper =
                     new RocksDBWriteBatchWrapper(
                             db, optionsContainer.getWriteOptions(), writeBatchSize);
+
             // it is important that we only create the key builder after the restore, and not
             // before;
             // restore operations may reconfigure the key serializer, so accessing the key
@@ -313,6 +320,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                             cancelStreamRegistryForBackend,
                             rocksDBResourceGuard,
                             kvStateInformation,
+                            registeredPQStates,
                             keyGroupPrefixBytes,
                             db,
                             backendUID,
@@ -377,6 +385,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                 this.ttlTimeProvider,
                 db,
                 kvStateInformation,
+                registeredPQStates,
                 keyGroupPrefixBytes,
                 cancelStreamRegistryForBackend,
                 this.keyGroupCompressionDecorator,
@@ -397,6 +406,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
             int keyGroupPrefixBytes,
             CloseableRegistry cancelStreamRegistry,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
+            LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
         DBOptions dbOptions = optionsContainer.getDbOptions();
         if (restoreStateHandles.isEmpty()) {
@@ -431,6 +441,24 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     ttlCompactFiltersManager,
                     writeBatchSize,
                     optionsContainer.getWriteBufferManagerCapacity());
+        } else if (priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP) {
+            return new RocksDBHeapTimersFullRestoreOperation<>(
+                    keyGroupRange,
+                    numberOfKeyGroups,
+                    userCodeClassLoader,
+                    kvStateInformation,
+                    registeredPQStates,
+                    createHeapQueueFactory(),
+                    keySerializerProvider,
+                    instanceRocksDBPath,
+                    dbOptions,
+                    columnFamilyOptionsFactory,
+                    nativeMetricOptions,
+                    metricGroup,
+                    restoreStateHandles,
+                    ttlCompactFiltersManager,
+                    writeBatchSize,
+                    optionsContainer.getWriteBufferManagerCapacity());
         } else {
             return new RocksDBFullRestoreOperation<>(
                     keyGroupRange,
@@ -453,6 +481,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
             CloseableRegistry cancelStreamRegistry,
             ResourceGuard rocksDBResourceGuard,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
+            LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             int keyGroupPrefixBytes,
             RocksDB db,
             UUID backendUID,
@@ -464,6 +493,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                         rocksDBResourceGuard,
                         keySerializerProvider.currentSchemaSerializer(),
                         kvStateInformation,
+                        registeredPQStates,
                         keyGroupRange,
                         keyGroupPrefixBytes,
                         localRecoveryConfig,
@@ -502,8 +532,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         PriorityQueueSetFactory priorityQueueFactory;
         switch (priorityQueueStateType) {
             case HEAP:
-                priorityQueueFactory =
-                        new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
+                priorityQueueFactory = createHeapQueueFactory();
                 break;
             case ROCKSDB:
                 priorityQueueFactory =
@@ -526,6 +555,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         return priorityQueueFactory;
     }
 
+    private HeapPriorityQueueSetFactory createHeapQueueFactory() {
+        return new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
+    }
+
     private void prepareDirectories() throws IOException {
         checkAndCreateDirectory(instanceBasePath);
         if (instanceRocksDBPath.exists()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksQueueIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksQueueIterator.java
new file mode 100644
index 0000000..b4948cc
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksQueueIterator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/** An iterator over heap timers that produces rocks compatible binary format. */
+public final class RocksQueueIterator implements SingleStateIterator {
+
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+    private final DataOutputSerializer keyOut = new DataOutputSerializer(128);
+    private final HeapPriorityQueueStateSnapshot<?> queueSnapshot;
+    private final Iterator<Integer> keyGroupRangeIterator;
+    private final int kvStateId;
+    private final int keyGroupPrefixBytes;
+    private final TypeSerializer<Object> elementSerializer;
+
+    private Iterator<Object> elementsForKeyGroup;
+    private int afterKeyMark = 0;
+
+    private boolean isValid;
+    private byte[] currentKey;
+
+    public RocksQueueIterator(
+            HeapPriorityQueueStateSnapshot<?> queuesSnapshot,
+            KeyGroupRange keyGroupRange,
+            int keyGroupPrefixBytes,
+            int kvStateId) {
+        this.queueSnapshot = queuesSnapshot;
+        this.elementSerializer = castToType(queuesSnapshot.getMetaInfo().getElementSerializer());
+        this.keyGroupRangeIterator = keyGroupRange.iterator();
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.kvStateId = kvStateId;
+        if (keyGroupRangeIterator.hasNext()) {
+            try {
+                if (moveToNextNonEmptyKeyGroup()) {
+                    isValid = true;
+                    next();
+                } else {
+                    isValid = false;
+                }
+            } catch (IOException e) {
+                throw new FlinkRuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void next() {
+        try {
+            if (!elementsForKeyGroup.hasNext()) {
+                boolean hasElement = moveToNextNonEmptyKeyGroup();
+                if (!hasElement) {
+                    isValid = false;
+                    return;
+                }
+            }
+            keyOut.setPosition(afterKeyMark);
+            elementSerializer.serialize(elementsForKeyGroup.next(), keyOut);
+            this.currentKey = keyOut.getCopyOfBuffer();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private boolean moveToNextNonEmptyKeyGroup() throws IOException {
+        while (keyGroupRangeIterator.hasNext()) {
+            Integer keyGroupId = keyGroupRangeIterator.next();
+            elementsForKeyGroup = castToType(queueSnapshot.getIteratorForKeyGroup(keyGroupId));
+            if (elementsForKeyGroup.hasNext()) {
+                writeKeyGroupId(keyGroupId);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void writeKeyGroupId(Integer keyGroupId) throws IOException {
+        keyOut.clear();
+        CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOut);
+        afterKeyMark = keyOut.length();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> TypeSerializer<T> castToType(TypeSerializer<?> typeSerializer) {
+        return (TypeSerializer<T>) typeSerializer;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Iterator<T> castToType(Iterator<?> iterator) {
+        return (Iterator<T>) iterator;
+    }
+
+    @Override
+    public boolean isValid() {
+        return isValid;
+    }
+
+    @Override
+    public byte[] key() {
+        return currentKey;
+    }
+
+    @Override
+    public byte[] value() {
+        return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getKvStateId() {
+        return kvStateId;
+    }
+
+    @Override
+    public void close() {}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
index 613d181..ed8cc0d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -73,6 +73,7 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
     public RocksStatesPerKeyGroupMergeIterator(
             final CloseableRegistry closeableRegistry,
             List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+            List<SingleStateIterator> heapPriorityQueueIterators,
             final int keyGroupPrefixByteCount)
             throws IOException {
         Preconditions.checkNotNull(closeableRegistry);
@@ -82,8 +83,8 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
         this.closeableRegistry = closeableRegistry;
         this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-        if (kvStateIterators.size() > 0) {
-            this.heap = buildIteratorHeap(kvStateIterators);
+        if (kvStateIterators.size() > 0 || heapPriorityQueueIterators.size() > 0) {
+            this.heap = buildIteratorHeap(kvStateIterators, heapPriorityQueueIterators);
             this.valid = !heap.isEmpty();
             this.currentSubIterator = heap.poll();
             kvStateIterators.clear();
@@ -129,13 +130,17 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
     }
 
     private PriorityQueue<SingleStateIterator> buildIteratorHeap(
-            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) throws IOException {
+            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+            List<SingleStateIterator> heapPriorityQueueIterators)
+            throws IOException {
 
         Comparator<SingleStateIterator> iteratorComparator =
                 COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
         PriorityQueue<SingleStateIterator> iteratorPriorityQueue =
-                new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+                new PriorityQueue<>(
+                        kvStateIterators.size() + heapPriorityQueueIterators.size(),
+                        iteratorComparator);
 
         for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
             final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
@@ -152,6 +157,16 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
                 }
             }
         }
+
+        for (SingleStateIterator heapQueueIterator : heapPriorityQueueIterators) {
+            if (heapQueueIterator.isValid()) {
+                iteratorPriorityQueue.offer(heapQueueIterator);
+                closeableRegistry.registerCloseable(heapQueueIterator);
+            } else {
+                IOUtils.closeQuietly(heapQueueIterator);
+            }
+        }
+
         return iteratorPriorityQueue;
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 7b5608a..4005add 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -45,10 +45,10 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
 public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
@@ -115,16 +115,14 @@ public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
             throws IOException, RocksDBException, StateMigrationException {
         List<StateMetaInfoSnapshot> restoredMetaInfos =
                 savepointRestoreResult.getStateMetaInfoSnapshots();
-        List<ColumnFamilyHandle> columnFamilyHandles =
-                restoredMetaInfos.stream()
-                        .map(
-                                stateMetaInfoSnapshot -> {
-                                    RocksDbKvStateInfo registeredStateCFHandle =
-                                            this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                                    null, stateMetaInfoSnapshot);
-                                    return registeredStateCFHandle.columnFamilyHandle;
-                                })
-                        .collect(Collectors.toList());
+        Map<Integer, ColumnFamilyHandle> columnFamilyHandles = new HashMap<>();
+        for (int i = 0; i < restoredMetaInfos.size(); i++) {
+            StateMetaInfoSnapshot restoredMetaInfo = restoredMetaInfos.get(i);
+            RocksDbKvStateInfo registeredStateCFHandle =
+                    this.rocksHandle.getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
+            columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle);
+        }
+
         try (ThrowingIterator<KeyGroup> keyGroups = savepointRestoreResult.getRestoredKeyGroups()) {
             restoreKVStateData(keyGroups, columnFamilyHandles);
         }
@@ -135,17 +133,23 @@ public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
      * handle.
      */
     private void restoreKVStateData(
-            ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle> columnFamilies)
+            ThrowingIterator<KeyGroup> keyGroups, Map<Integer, ColumnFamilyHandle> columnFamilies)
             throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
                 new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
+            ColumnFamilyHandle handle = null;
             while (keyGroups.hasNext()) {
                 KeyGroup keyGroup = keyGroups.next();
                 try (ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries()) {
+                    int oldKvStateId = -1;
                     while (groupEntries.hasNext()) {
                         KeyGroupEntry groupEntry = groupEntries.next();
-                        ColumnFamilyHandle handle = columnFamilies.get(groupEntry.getKvStateId());
+                        int kvStateId = groupEntry.getKvStateId();
+                        if (kvStateId != oldKvStateId) {
+                            oldKvStateId = kvStateId;
+                            handle = columnFamilies.get(kvStateId);
+                        }
                         writeBatchWrapper.put(handle, groupEntry.getKey(), groupEntry.getValue());
                     }
                 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
new file mode 100644
index 0000000..0c859e8
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.restore;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
+import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation;
+import org.apache.flink.runtime.state.restore.KeyGroup;
+import org.apache.flink.runtime.state.restore.KeyGroupEntry;
+import org.apache.flink.runtime.state.restore.SavepointRestoreResult;
+import org.apache.flink.runtime.state.restore.ThrowingIterator;
+import org.apache.flink.util.StateMigrationException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
+public class RocksDBHeapTimersFullRestoreOperation<K> implements RocksDBRestoreOperation {
+    private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
+    /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
+    private final long writeBatchSize;
+
+    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
+            registeredPQStates;
+    private final HeapPriorityQueueSetFactory priorityQueueFactory;
+    private final int numberOfKeyGroups;
+    private final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+    private final RocksDBHandle rocksHandle;
+    private final KeyGroupRange keyGroupRange;
+    private final int keyGroupPrefixBytes;
+
+    public RocksDBHeapTimersFullRestoreOperation(
+            KeyGroupRange keyGroupRange,
+            int numberOfKeyGroups,
+            ClassLoader userCodeClassLoader,
+            Map<String, RocksDbKvStateInfo> kvStateInformation,
+            LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            HeapPriorityQueueSetFactory priorityQueueFactory,
+            StateSerializerProvider<K> keySerializerProvider,
+            File instanceRocksDBPath,
+            DBOptions dbOptions,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            RocksDBNativeMetricOptions nativeMetricOptions,
+            MetricGroup metricGroup,
+            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
+            @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            @Nonnegative long writeBatchSize,
+            Long writeBufferManagerCapacity) {
+        this.writeBatchSize = writeBatchSize;
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
+        this.savepointRestoreOperation =
+                new FullSnapshotRestoreOperation<>(
+                        keyGroupRange,
+                        userCodeClassLoader,
+                        restoreStateHandles,
+                        keySerializerProvider);
+        this.registeredPQStates = registeredPQStates;
+        this.priorityQueueFactory = priorityQueueFactory;
+        this.numberOfKeyGroups = numberOfKeyGroups;
+        this.keyGroupRange = keyGroupRange;
+        this.keyGroupPrefixBytes =
+                CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
+                        numberOfKeyGroups);
+    }
+
+    /** Restores all key-groups data that is referenced by the passed state handles. */
+    @Override
+    public RocksDBRestoreResult restore()
+            throws IOException, StateMigrationException, RocksDBException {
+        rocksHandle.openDB();
+        try (ThrowingIterator<SavepointRestoreResult> restore =
+                savepointRestoreOperation.restore()) {
+            while (restore.hasNext()) {
+                applyRestoreResult(restore.next());
+            }
+        }
+        return new RocksDBRestoreResult(
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
+                -1,
+                null,
+                null);
+    }
+
+    private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
+            throws IOException, RocksDBException, StateMigrationException {
+        List<StateMetaInfoSnapshot> restoredMetaInfos =
+                savepointRestoreResult.getStateMetaInfoSnapshots();
+        Map<Integer, ColumnFamilyHandle> columnFamilyHandles = new HashMap<>();
+        Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates = new HashMap<>();
+        for (int i = 0; i < restoredMetaInfos.size(); i++) {
+            StateMetaInfoSnapshot restoredMetaInfo = restoredMetaInfos.get(i);
+            if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE) {
+                String stateName = restoredMetaInfo.getName();
+                HeapPriorityQueueSnapshotRestoreWrapper<?> queueWrapper =
+                        registeredPQStates.computeIfAbsent(
+                                stateName,
+                                key ->
+                                        createInternal(
+                                                new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                                                        restoredMetaInfo)));
+                restoredPQStates.put(i, queueWrapper);
+            } else {
+                RocksDbKvStateInfo registeredStateCFHandle =
+                        this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                null, restoredMetaInfo);
+                columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle);
+            }
+        }
+
+        try (ThrowingIterator<KeyGroup> keyGroups = savepointRestoreResult.getRestoredKeyGroups()) {
+            restoreKVStateData(keyGroups, columnFamilyHandles, restoredPQStates);
+        }
+    }
+
+    /**
+     * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state
+     * handle.
+     */
+    private void restoreKVStateData(
+            ThrowingIterator<KeyGroup> keyGroups,
+            Map<Integer, ColumnFamilyHandle> columnFamilies,
+            Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates)
+            throws IOException, RocksDBException, StateMigrationException {
+        // for all key-groups in the current state handle...
+        try (RocksDBWriteBatchWrapper writeBatchWrapper =
+                new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
+            HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> restoredPQ = null;
+            ColumnFamilyHandle handle = null;
+            while (keyGroups.hasNext()) {
+                KeyGroup keyGroup = keyGroups.next();
+                try (ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries()) {
+                    int oldKvStateId = -1;
+                    while (groupEntries.hasNext()) {
+                        KeyGroupEntry groupEntry = groupEntries.next();
+                        int kvStateId = groupEntry.getKvStateId();
+                        if (kvStateId != oldKvStateId) {
+                            oldKvStateId = kvStateId;
+                            handle = columnFamilies.get(kvStateId);
+                            restoredPQ = getRestoredPQ(restoredPQStates, kvStateId);
+                        }
+                        if (restoredPQ != null) {
+                            restoreQueueElement(restoredPQ, groupEntry);
+                        } else if (handle != null) {
+                            writeBatchWrapper.put(
+                                    handle, groupEntry.getKey(), groupEntry.getValue());
+                        } else {
+                            throw new IllegalStateException("Unknown state id: " + kvStateId);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void restoreQueueElement(
+            HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> restoredPQ,
+            KeyGroupEntry groupEntry)
+            throws IOException {
+        deserializer.setBuffer(groupEntry.getKey());
+        deserializer.skipBytesToRead(keyGroupPrefixBytes);
+        HeapPriorityQueueElement queueElement =
+                restoredPQ.getMetaInfo().getElementSerializer().deserialize(deserializer);
+        restoredPQ.getPriorityQueue().add(queueElement);
+    }
+
+    @SuppressWarnings("unchecked")
+    private HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> getRestoredPQ(
+            Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates,
+            int kvStateId) {
+        return (HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>)
+                restoredPQStates.get(kvStateId);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal(
+                    RegisteredPriorityQueueStateBackendMetaInfo metaInfo) {
+
+        final String stateName = metaInfo.getName();
+        final HeapPriorityQueueSet<T> priorityQueue =
+                priorityQueueFactory.create(stateName, metaInfo.getElementSerializer());
+
+        return new HeapPriorityQueueSnapshotRestoreWrapper<>(
+                priorityQueue,
+                metaInfo,
+                KeyExtractorFunction.forKeyedObjects(),
+                keyGroupRange,
+                numberOfKeyGroups);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
+    }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
index eff0e8a..09937d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksQueueIterator;
 import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
 import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.SingleStateIterator;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.FullSnapshotResources;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -31,6 +33,7 @@ import org.apache.flink.runtime.state.KeyValueStateIterator;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.ResourceGuard;
@@ -61,11 +64,13 @@ class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
     private final KeyGroupRange keyGroupRange;
     private final TypeSerializer<K> keySerializer;
     private final StreamCompressionDecorator streamCompressionDecorator;
+    private final List<HeapPriorityQueueStateSnapshot<?>> heapPriorityQueuesSnapshots;
 
     public RocksDBFullSnapshotResources(
             ResourceGuard.Lease lease,
             Snapshot snapshot,
             List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> metaDataCopy,
+            List<HeapPriorityQueueStateSnapshot<?>> heapPriorityQueuesSnapshots,
             List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
             RocksDB db,
             int keyGroupPrefixBytes,
@@ -75,6 +80,7 @@ class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
         this.lease = lease;
         this.snapshot = snapshot;
         this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.heapPriorityQueuesSnapshots = heapPriorityQueuesSnapshots;
         this.db = db;
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keyGroupRange = keyGroupRange;
@@ -115,10 +121,16 @@ class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
             List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
                     createKVStateIterators(closeableRegistry, readOptions);
 
+            List<SingleStateIterator> heapPriorityQueueIterators =
+                    createHeapPriorityQueueIterators();
+
             // Here we transfer ownership of the required resources to the
             // RocksStatesPerKeyGroupMergeIterator
             return new RocksStatesPerKeyGroupMergeIterator(
-                    closeableRegistry, new ArrayList<>(kvStateIterators), keyGroupPrefixBytes);
+                    closeableRegistry,
+                    kvStateIterators,
+                    heapPriorityQueueIterators,
+                    keyGroupPrefixBytes);
         } catch (Throwable t) {
             // If anything goes wrong, clean up our stuff. If things went smoothly the
             // merging iterator is now responsible for closing the resources
@@ -127,6 +139,18 @@ class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
         }
     }
 
+    private List<SingleStateIterator> createHeapPriorityQueueIterators() {
+        int kvStateId = metaData.size();
+        List<SingleStateIterator> queuesIterators =
+                new ArrayList<>(heapPriorityQueuesSnapshots.size());
+        for (HeapPriorityQueueStateSnapshot<?> queuesSnapshot : heapPriorityQueuesSnapshots) {
+            queuesIterators.add(
+                    new RocksQueueIterator(
+                            queuesSnapshot, keyGroupRange, keyGroupPrefixBytes, kvStateId++));
+        }
+        return queuesIterators;
+    }
+
     private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
             CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 0b83bd8..b3318f2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -31,6 +31,8 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.function.SupplierWithException;
@@ -64,11 +66,17 @@ public class RocksFullSnapshotStrategy<K>
     /** This decorator is used to apply compression per key-group for the written snapshot data. */
     @Nonnull private final StreamCompressionDecorator keyGroupCompressionDecorator;
 
+    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
+            registeredPQStates;
+
     public RocksFullSnapshotStrategy(
             @Nonnull RocksDB db,
             @Nonnull ResourceGuard rocksDBResourceGuard,
             @Nonnull TypeSerializer<K> keySerializer,
             @Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
+            @Nonnull
+                    LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>>
+                            registeredPQStates,
             @Nonnull KeyGroupRange keyGroupRange,
             @Nonnegative int keyGroupPrefixBytes,
             @Nonnull LocalRecoveryConfig localRecoveryConfig,
@@ -84,6 +92,7 @@ public class RocksFullSnapshotStrategy<K>
                 localRecoveryConfig);
 
         this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+        this.registeredPQStates = registeredPQStates;
     }
 
     @Override
@@ -99,6 +108,13 @@ public class RocksFullSnapshotStrategy<K>
             metaDataCopy.add(stateInfo);
         }
 
+        List<HeapPriorityQueueStateSnapshot<?>> heapPriorityQueuesSnapshots =
+                new ArrayList<>(registeredPQStates.size());
+        for (HeapPriorityQueueSnapshotRestoreWrapper<?> stateInfo : registeredPQStates.values()) {
+            stateMetaInfoSnapshots.add(stateInfo.getMetaInfo().snapshot());
+            heapPriorityQueuesSnapshots.add(stateInfo.stateSnapshot());
+        }
+
         final ResourceGuard.Lease lease = rocksDBResourceGuard.acquireResource();
         final Snapshot snapshot = db.getSnapshot();
 
@@ -106,6 +122,7 @@ public class RocksFullSnapshotStrategy<K>
                 lease,
                 snapshot,
                 metaDataCopy,
+                heapPriorityQueuesSnapshots,
                 stateMetaInfoSnapshots,
                 db,
                 keyGroupPrefixBytes,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 6361abc..cac9d61 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -60,7 +60,10 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
     public void testEmptyMergeIterator() throws Exception {
         RocksStatesPerKeyGroupMergeIterator emptyIterator =
                 new RocksStatesPerKeyGroupMergeIterator(
-                        new CloseableRegistry(), Collections.emptyList(), 2);
+                        new CloseableRegistry(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        2);
         Assert.assertFalse(emptyIterator.isValid());
     }
 
@@ -134,6 +137,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
                     new RocksStatesPerKeyGroupMergeIterator(
                             closeableRegistry,
                             rocksIteratorsWithKVStateId,
+                            Collections.emptyList(),
                             maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {
 
                 int prevKVState = -1;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
index 2cccafd..a72ca84 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
 import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -66,7 +66,10 @@ public final class BackendSwitchSpecs {
     }
 
     /** Specification for a {@link RocksDBKeyedStateBackend}. */
-    static final BackendSwitchSpec ROCKS = new RocksSpec();
+    static final BackendSwitchSpec ROCKS = new RocksSpec(PriorityQueueStateType.ROCKSDB);
+
+    /** Specification for a {@link RocksDBKeyedStateBackend} which stores its timers on heap. */
+    static final BackendSwitchSpec ROCKS_HEAP_TIMERS = new RocksSpec(PriorityQueueStateType.HEAP);
 
     /** Specification for a {@link HeapKeyedStateBackend}. */
     static final BackendSwitchSpec HEAP = new HeapSpec();
@@ -74,6 +77,11 @@ public final class BackendSwitchSpecs {
     private static final class RocksSpec implements BackendSwitchSpec {
 
         private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+        private final PriorityQueueStateType queueStateType;
+
+        public RocksSpec(PriorityQueueStateType queueStateType) {
+            this.queueStateType = queueStateType;
+        }
 
         @Override
         public CheckpointableKeyedStateBackend<String> createBackend(
@@ -97,7 +105,7 @@ public final class BackendSwitchSpecs {
                             keyGroupRange,
                             new ExecutionConfig(),
                             TestLocalRecoveryConfig.disabled(),
-                            RocksDBStateBackend.PriorityQueueStateType.ROCKSDB,
+                            queueStateType,
                             TtlTimeProvider.DEFAULT,
                             new UnregisteredMetricsGroup(),
                             stateHandles,
@@ -113,7 +121,7 @@ public final class BackendSwitchSpecs {
 
         @Override
         public String toString() {
-            return "ROCKS";
+            return "ROCKS(" + queueStateType + ")";
         }
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
index d7f7e2d..7a52c55 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/RocksSavepointStateBackendSwitchTest.java
@@ -25,16 +25,28 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /** Tests for switching a RocksDB state backend to a different one. */
 @RunWith(Parameterized.class)
 public class RocksSavepointStateBackendSwitchTest extends SavepointStateBackendSwitchTestBase {
-    public RocksSavepointStateBackendSwitchTest(BackendSwitchSpec toBackend) {
-        super(BackendSwitchSpecs.ROCKS, toBackend);
+    public RocksSavepointStateBackendSwitchTest(
+            BackendSwitchSpec fromBackend, BackendSwitchSpec toBackend) {
+        super(fromBackend, toBackend);
     }
 
-    @Parameterized.Parameters(name = "to: {0}")
-    public static Collection<BackendSwitchSpec> targetBackends() {
-        return Arrays.asList(BackendSwitchSpecs.HEAP, BackendSwitchSpecs.ROCKS);
+    @Parameterized.Parameters(name = "from: {0} to: {1}")
+    public static Collection<BackendSwitchSpec[]> targetBackends() {
+        List<BackendSwitchSpec> fromBackends =
+                Arrays.asList(BackendSwitchSpecs.ROCKS_HEAP_TIMERS, BackendSwitchSpecs.ROCKS);
+        List<BackendSwitchSpec> toBackends =
+                Arrays.asList(
+                        BackendSwitchSpecs.HEAP,
+                        BackendSwitchSpecs.ROCKS,
+                        BackendSwitchSpecs.ROCKS_HEAP_TIMERS);
+        return fromBackends.stream()
+                .flatMap(from -> toBackends.stream().map(to -> new BackendSwitchSpec[] {from, to}))
+                .collect(Collectors.toList());
     }
 }


[flink] 04/09: [refactor] Extract common interface for a single Rocks state

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5fbb64dbfc0d872d5574a10cb7ae035f5d5405a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Feb 5 16:43:41 2021 +0100

    [refactor] Extract common interface for a single Rocks state
    
    This commit introduces an interface for iterating over a single state in
    RocksDB state backend. This is a prerequisite for storing heap timers
    along with other states from RocksDB.
---
 .../state/iterator/RocksSingleStateIterator.java   | 29 ++++++++++------
 .../RocksStatesPerKeyGroupMergeIterator.java       | 39 ++++++++++------------
 .../state/iterator/SingleStateIterator.java        | 37 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 32 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
index 3c0aa82..4608acb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -23,13 +23,11 @@ import org.apache.flink.util.IOUtils;
 
 import javax.annotation.Nonnull;
 
-import java.io.Closeable;
-
 /**
  * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to
  * the iterator. Used by {@link RocksStatesPerKeyGroupMergeIterator}.
  */
-class RocksSingleStateIterator implements Closeable {
+class RocksSingleStateIterator implements SingleStateIterator {
 
     /**
      * @param iterator underlying {@link RocksIteratorWrapper}
@@ -45,19 +43,30 @@ class RocksSingleStateIterator implements Closeable {
     private byte[] currentKey;
     private final int kvStateId;
 
-    public byte[] getCurrentKey() {
-        return currentKey;
+    @Override
+    public void next() {
+        iterator.next();
+        if (iterator.isValid()) {
+            currentKey = iterator.key();
+        }
+    }
+
+    @Override
+    public boolean isValid() {
+        return iterator.isValid();
     }
 
-    public void setCurrentKey(byte[] currentKey) {
-        this.currentKey = currentKey;
+    @Override
+    public byte[] key() {
+        return currentKey;
     }
 
-    @Nonnull
-    public RocksIteratorWrapper getIterator() {
-        return iterator;
+    @Override
+    public byte[] value() {
+        return iterator.value();
     }
 
+    @Override
     public int getKvStateId() {
         return kvStateId;
     }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
index 2f970c9..613d181 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -40,14 +40,14 @@ import java.util.PriorityQueue;
 public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterator {
 
     private final CloseableRegistry closeableRegistry;
-    private final PriorityQueue<RocksSingleStateIterator> heap;
+    private final PriorityQueue<SingleStateIterator> heap;
     private final int keyGroupPrefixByteCount;
     private boolean newKeyGroup;
     private boolean newKVState;
     private boolean valid;
-    private RocksSingleStateIterator currentSubIterator;
+    private SingleStateIterator currentSubIterator;
 
-    private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS;
+    private static final List<Comparator<SingleStateIterator>> COMPARATORS;
 
     static {
         int maxBytes = 2;
@@ -57,8 +57,7 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
             COMPARATORS.add(
                     (o1, o2) -> {
                         int arrayCmpRes =
-                                compareKeyGroupsForByteArrays(
-                                        o1.getCurrentKey(), o2.getCurrentKey(), currentBytes);
+                                compareKeyGroupsForByteArrays(o1.key(), o2.key(), currentBytes);
                         return arrayCmpRes == 0
                                 ? o1.getKvStateId() - o2.getKvStateId()
                                 : arrayCmpRes;
@@ -103,18 +102,14 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
         newKeyGroup = false;
         newKVState = false;
 
-        final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
-        rocksIterator.next();
-
-        byte[] oldKey = currentSubIterator.getCurrentKey();
-        if (rocksIterator.isValid()) {
-
-            currentSubIterator.setCurrentKey(rocksIterator.key());
-
-            if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+        byte[] oldKey = currentSubIterator.key();
+        currentSubIterator.next();
+        if (currentSubIterator.isValid()) {
+            if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) {
+                SingleStateIterator oldIterator = currentSubIterator;
                 heap.offer(currentSubIterator);
                 currentSubIterator = heap.remove();
-                newKVState = currentSubIterator.getIterator() != rocksIterator;
+                newKVState = currentSubIterator != oldIterator;
                 detectNewKeyGroup(oldKey);
             }
         } else {
@@ -133,13 +128,13 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
         }
     }
 
-    private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+    private PriorityQueue<SingleStateIterator> buildIteratorHeap(
             List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) throws IOException {
 
-        Comparator<RocksSingleStateIterator> iteratorComparator =
+        Comparator<SingleStateIterator> iteratorComparator =
                 COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
-        PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+        PriorityQueue<SingleStateIterator> iteratorPriorityQueue =
                 new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
         for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
@@ -165,14 +160,14 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
     }
 
     private void detectNewKeyGroup(byte[] oldKey) {
-        if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+        if (isDifferentKeyGroup(oldKey, currentSubIterator.key())) {
             newKeyGroup = true;
         }
     }
 
     @Override
     public int keyGroup() {
-        final byte[] currentKey = currentSubIterator.getCurrentKey();
+        final byte[] currentKey = currentSubIterator.key();
         int result = 0;
         // big endian decode
         for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
@@ -184,12 +179,12 @@ public class RocksStatesPerKeyGroupMergeIterator implements KeyValueStateIterato
 
     @Override
     public byte[] key() {
-        return currentSubIterator.getCurrentKey();
+        return currentSubIterator.key();
     }
 
     @Override
     public byte[] value() {
-        return currentSubIterator.getIterator().value();
+        return currentSubIterator.value();
     }
 
     @Override
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java
new file mode 100644
index 0000000..0238279
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/SingleStateIterator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import java.io.Closeable;
+
+/** An interface for iterating over a single state in a RocksDB state backend. */
+public interface SingleStateIterator extends Closeable {
+    void next();
+
+    boolean isValid();
+
+    byte[] key();
+
+    byte[] value();
+
+    int getKvStateId();
+
+    @Override
+    void close();
+}


[flink] 01/09: [hotfix] Remove unnecessary if in RocksIncrementalSnapshotStrategy

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f16bff7547c81539f9f34eff1ae380e20efea13
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 2 13:28:54 2021 +0100

    [hotfix] Remove unnecessary if in RocksIncrementalSnapshotStrategy
---
 .../state/snapshot/RocksIncrementalSnapshotStrategy.java          | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 0921924..682a3f7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -177,18 +177,12 @@ public class RocksIncrementalSnapshotStrategy<K>
             return registry -> SnapshotResult.empty();
         }
 
-        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
-                snapshotResources.stateMetaInfoSnapshots;
-        if (stateMetaInfoSnapshots.isEmpty()) {
-            return snapshotCloseableRegistry -> SnapshotResult.empty();
-        }
-
         return new RocksDBIncrementalSnapshotOperation(
                 checkpointId,
                 checkpointStreamFactory,
                 snapshotResources.snapshotDirectory,
                 snapshotResources.baseSstFiles,
-                stateMetaInfoSnapshots);
+                snapshotResources.stateMetaInfoSnapshots);
     }
 
     @Override


[flink] 03/09: [refactor] Remove AbstractRocksDBRestoreOperation

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ed5c1a26f53b9481d5616669c91c0f272bdc949
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 8 16:32:25 2021 +0100

    [refactor] Remove AbstractRocksDBRestoreOperation
    
    So far both the RocksFullSnapshotRestoreOperation and
    RocksIncrementalRestoreOperation extended from
    AbstractRocksDBRestoreOperation in order to share some functions.
    However it required e.g. unnecessary parameters to be passed just to
    fulfill the requirements of the base class. Moreover a base class makes
    it harder to extend classes independently.
    
    This commit changes sharing the common code to use composition instead
    of inheritance.
---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  18 +-
 .../state/restore/RocksDBFullRestoreOperation.java |  57 +++---
 ...sDBRestoreOperation.java => RocksDBHandle.java} | 201 +++++++++++----------
 .../RocksDBIncrementalRestoreOperation.java        | 191 +++++++++++---------
 .../state/restore/RocksDBNoneRestoreOperation.java |  58 +++---
 .../state/restore/RocksDBRestoreOperation.java     |   3 +-
 6 files changed, 261 insertions(+), 267 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 5f6426c..ce90d05 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -21,10 +21,10 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
 import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
@@ -250,7 +250,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                 new LinkedHashMap<>();
         RocksDB db = null;
-        AbstractRocksDBRestoreOperation restoreOperation = null;
+        RocksDBRestoreOperation restoreOperation = null;
         RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
                 new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
 
@@ -393,7 +393,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                 writeBatchSize);
     }
 
-    private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
+    private RocksDBRestoreOperation getRocksDBRestoreOperation(
             int keyGroupPrefixBytes,
             CloseableRegistry cancelStreamRegistry,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
@@ -401,20 +401,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         DBOptions dbOptions = optionsContainer.getDbOptions();
         if (restoreStateHandles.isEmpty()) {
             return new RocksDBNoneRestoreOperation<>(
-                    keyGroupRange,
-                    keyGroupPrefixBytes,
-                    numberOfTransferingThreads,
-                    cancelStreamRegistry,
-                    userCodeClassLoader,
                     kvStateInformation,
-                    keySerializerProvider,
-                    instanceBasePath,
                     instanceRocksDBPath,
                     dbOptions,
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
                     metricGroup,
-                    restoreStateHandles,
                     ttlCompactFiltersManager,
                     optionsContainer.getWriteBufferManagerCapacity());
         }
@@ -442,13 +434,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         } else {
             return new RocksDBFullRestoreOperation<>(
                     keyGroupRange,
-                    keyGroupPrefixBytes,
-                    numberOfTransferingThreads,
-                    cancelStreamRegistry,
                     userCodeClassLoader,
                     kvStateInformation,
                     keySerializerProvider,
-                    instanceBasePath,
                     instanceRocksDBPath,
                     dbOptions,
                     columnFamilyOptionsFactory,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 6c98d9b..7b5608a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDb
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -51,23 +50,19 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
-public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
     private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
     /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
     private final long writeBatchSize;
 
+    private final RocksDBHandle rocksHandle;
+
     public RocksDBFullRestoreOperation(
             KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
             ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
             StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@@ -77,25 +72,17 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
-        checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
         this.savepointRestoreOperation =
                 new FullSnapshotRestoreOperation<>(
                         keyGroupRange,
@@ -108,7 +95,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
     @Override
     public RocksDBRestoreResult restore()
             throws IOException, StateMigrationException, RocksDBException {
-        openDB();
+        rocksHandle.openDB();
         try (ThrowingIterator<SavepointRestoreResult> restore =
                 savepointRestoreOperation.restore()) {
             while (restore.hasNext()) {
@@ -116,7 +103,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             }
         }
         return new RocksDBRestoreResult(
-                this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null);
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
+                -1,
+                null,
+                null);
     }
 
     private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
@@ -128,7 +120,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
                         .map(
                                 stateMetaInfoSnapshot -> {
                                     RocksDbKvStateInfo registeredStateCFHandle =
-                                            getOrRegisterStateColumnFamilyHandle(
+                                            this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
                                                     null, stateMetaInfoSnapshot);
                                     return registeredStateCFHandle.columnFamilyHandle;
                                 })
@@ -147,7 +139,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
-                new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
+                new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
             while (keyGroups.hasNext()) {
                 KeyGroup keyGroup = keyGroups.next();
                 try (ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries()) {
@@ -160,4 +152,9 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             }
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
similarity index 55%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index a670dc1..5a37db0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,16 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -49,39 +41,34 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+
 /**
- * Base implementation of RocksDB restore operation.
- *
- * @param <K> The data type that the serializer serializes.
+ * Utility for creating a RocksDB instance either from scratch or from restored local state. This
+ * will also register {@link RocksDbKvStateInfo} when using {@link #openDB(List, List, Path)}.
  */
-public abstract class AbstractRocksDBRestoreOperation<K>
-        implements RocksDBRestoreOperation, AutoCloseable {
+class RocksDBHandle implements AutoCloseable {
+
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
-    protected final KeyGroupRange keyGroupRange;
-    protected final int keyGroupPrefixBytes;
-    protected final int numberOfTransferringThreads;
-    protected final CloseableRegistry cancelStreamRegistry;
-    protected final ClassLoader userCodeClassLoader;
-    protected final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
-    protected final DBOptions dbOptions;
-    protected final Map<String, RocksDbKvStateInfo> kvStateInformation;
-    protected final File instanceBasePath;
-    protected final File instanceRocksDBPath;
-    protected final String dbPath;
-    protected List<ColumnFamilyHandle> columnFamilyHandles;
-    protected List<ColumnFamilyDescriptor> columnFamilyDescriptors;
-    protected final StateSerializerProvider<K> keySerializerProvider;
-    protected final RocksDBNativeMetricOptions nativeMetricOptions;
-    protected final MetricGroup metricGroup;
-    protected final Collection<KeyedStateHandle> restoreStateHandles;
+    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
+    private final DBOptions dbOptions;
+    private final Map<String, RocksDbKvStateInfo> kvStateInformation;
+    private final String dbPath;
+    private List<ColumnFamilyHandle> columnFamilyHandles;
+    private List<ColumnFamilyDescriptor> columnFamilyDescriptors;
+    private final RocksDBNativeMetricOptions nativeMetricOptions;
+    private final MetricGroup metricGroup;
     // Current places to set compact filter into column family options:
     // - Incremental restore
     //   - restore with rescaling
@@ -94,46 +81,28 @@ public abstract class AbstractRocksDBRestoreOperation<K>
     // - Full restore
     //   - data ingestion after db open: #getOrRegisterStateColumnFamilyHandle before creating
     // column family
-    protected final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
-
-    protected RocksDB db;
-    protected ColumnFamilyHandle defaultColumnFamilyHandle;
-    protected RocksDBNativeMetricMonitor nativeMetricMonitor;
-    protected boolean isKeySerializerCompatibilityChecked;
-    protected final Long writeBufferManagerCapacity;
-
-    protected AbstractRocksDBRestoreOperation(
-            KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
-            ClassLoader userCodeClassLoader,
+    private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
+
+    private RocksDB db;
+    private ColumnFamilyHandle defaultColumnFamilyHandle;
+    private RocksDBNativeMetricMonitor nativeMetricMonitor;
+    private final Long writeBufferManagerCapacity;
+
+    protected RocksDBHandle(
             Map<String, RocksDbKvStateInfo> kvStateInformation,
-            StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> stateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             Long writeBufferManagerCapacity) {
-        this.keyGroupRange = keyGroupRange;
-        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
-        this.numberOfTransferringThreads = numberOfTransferringThreads;
-        this.cancelStreamRegistry = cancelStreamRegistry;
-        this.userCodeClassLoader = userCodeClassLoader;
         this.kvStateInformation = kvStateInformation;
-        this.keySerializerProvider = keySerializerProvider;
-        this.instanceBasePath = instanceBasePath;
-        this.instanceRocksDBPath = instanceRocksDBPath;
         this.dbPath = instanceRocksDBPath.getAbsolutePath();
         this.dbOptions = dbOptions;
         this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
         this.nativeMetricOptions = nativeMetricOptions;
         this.metricGroup = metricGroup;
-        this.restoreStateHandles = stateHandles;
         this.ttlCompactFiltersManager = ttlCompactFiltersManager;
         this.columnFamilyHandles = new ArrayList<>(1);
         this.columnFamilyDescriptors = Collections.emptyList();
@@ -141,6 +110,26 @@ public abstract class AbstractRocksDBRestoreOperation<K>
     }
 
     void openDB() throws IOException {
+        loadDb();
+    }
+
+    void openDB(
+            @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+            @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            @Nonnull Path restoreSourcePath)
+            throws IOException {
+        this.columnFamilyDescriptors = columnFamilyDescriptors;
+        this.columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
+        restoreInstanceDirectoryFromPath(restoreSourcePath);
+        loadDb();
+        // Register CF handlers
+        for (int i = 0; i < stateMetaInfoSnapshots.size(); i++) {
+            getOrRegisterStateColumnFamilyHandle(
+                    columnFamilyHandles.get(i), stateMetaInfoSnapshots.get(i));
+        }
+    }
+
+    private void loadDb() throws IOException {
         db =
                 RocksDBOperationUtils.openDB(
                         dbPath,
@@ -158,10 +147,6 @@ public abstract class AbstractRocksDBRestoreOperation<K>
                         : null;
     }
 
-    public RocksDB getDb() {
-        return this.db;
-    }
-
     RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
             ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) {
 
@@ -199,51 +184,71 @@ public abstract class AbstractRocksDBRestoreOperation<K>
         return registeredStateMetaInfoEntry;
     }
 
-    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
-            throws IOException, StateMigrationException {
-        // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
-        // deserialization of state happens lazily during runtime; we depend on the fact
-        // that the new serializer for states could be compatible, and therefore the restore can
-        // continue
-        // without old serializers required to be present.
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                new KeyedBackendSerializationProxy<>(userCodeClassLoader);
-        serializationProxy.read(dataInputView);
-        if (!isKeySerializerCompatibilityChecked) {
-            // fetch current serializer now because if it is incompatible, we can't access
-            // it anymore to improve the error message
-            TypeSerializer<K> currentSerializer = keySerializerProvider.currentSchemaSerializer();
-            // check for key serializer compatibility; this also reconfigures the
-            // key serializer to be compatible, if it is required and is possible
-            TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
-                    keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(
-                            serializationProxy.getKeySerializerSnapshot());
-            if (keySerializerSchemaCompat.isCompatibleAfterMigration()
-                    || keySerializerSchemaCompat.isIncompatible()) {
-                throw new StateMigrationException(
-                        "The new key serializer ("
-                                + currentSerializer
-                                + ") must be compatible with the previous key serializer ("
-                                + keySerializerProvider.previousSchemaSerializer()
-                                + ").");
-            }
+    /**
+     * This recreates the new working directory of the recovered RocksDB instance and links/copies
+     * the contents from a local state.
+     */
+    private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
+        final Path instanceRocksDBDirectory = Paths.get(dbPath);
+        final Path[] files = FileUtils.listDirectory(source);
+
+        if (!new File(dbPath).mkdirs()) {
+            String errMsg = "Could not create RocksDB data directory: " + dbPath;
+            logger.error(errMsg);
+            throw new IOException(errMsg);
+        }
 
-            isKeySerializerCompatibilityChecked = true;
+        for (Path file : files) {
+            final String fileName = file.getFileName().toString();
+            final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
+            if (fileName.endsWith(SST_FILE_SUFFIX)) {
+                // hardlink'ing the immutable sst-files.
+                Files.createLink(targetFile, file);
+            } else {
+                // true copy for all other files.
+                Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
+            }
         }
+    }
 
-        return serializationProxy;
+    public RocksDB getDb() {
+        return db;
+    }
+
+    public RocksDBNativeMetricMonitor getNativeMetricMonitor() {
+        return nativeMetricMonitor;
+    }
+
+    public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
+        return defaultColumnFamilyHandle;
+    }
+
+    public List<ColumnFamilyHandle> getColumnFamilyHandles() {
+        return columnFamilyHandles;
+    }
+
+    public RocksDbTtlCompactFiltersManager getTtlCompactFiltersManager() {
+        return ttlCompactFiltersManager;
+    }
+
+    public Long getWriteBufferManagerCapacity() {
+        return writeBufferManagerCapacity;
+    }
+
+    public Function<String, ColumnFamilyOptions> getColumnFamilyOptionsFactory() {
+        return columnFamilyOptionsFactory;
+    }
+
+    public DBOptions getDbOptions() {
+        return dbOptions;
     }
 
-    /** Necessary clean up iff restore operation failed. */
     @Override
-    public void close() {
+    public void close() throws Exception {
         IOUtils.closeQuietly(defaultColumnFamilyHandle);
         IOUtils.closeQuietly(nativeMetricMonitor);
         IOUtils.closeQuietly(db);
         // Making sure the already created column family options will be closed
         columnFamilyDescriptors.forEach((cfd) -> IOUtils.closeQuietly(cfd.getOptions()));
     }
-
-    @Override
-    public abstract RocksDBRestoreResult restore() throws Exception;
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 6b66853..7caf934d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
@@ -46,6 +48,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -54,6 +57,8 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -61,10 +66,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -76,19 +78,31 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Function;
 
-import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
 import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
-import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Encapsulates the process of restoring a RocksDB instance from an incremental snapshot. */
-public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOperation {
+
+    private static final Logger logger =
+            LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
     private final String operatorIdentifier;
     private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+    private final RocksDBHandle rocksHandle;
+    private final Collection<KeyedStateHandle> restoreStateHandles;
+    private final CloseableRegistry cancelStreamRegistry;
+    private final KeyGroupRange keyGroupRange;
+    private final File instanceBasePath;
+    private final int numberOfTransferringThreads;
+    private final int keyGroupPrefixBytes;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final ClassLoader userCodeClassLoader;
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
 
+    private boolean isKeySerializerCompatibilityChecked;
+
     public RocksDBIncrementalRestoreOperation(
             String operatorIdentifier,
             KeyGroupRange keyGroupRange,
@@ -108,29 +122,29 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
         this.operatorIdentifier = operatorIdentifier;
         this.restoredSstFiles = new TreeMap<>();
         this.lastCompletedCheckpointId = -1L;
         this.backendUID = UUID.randomUUID();
-        checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.restoreStateHandles = restoreStateHandles;
+        this.cancelStreamRegistry = cancelStreamRegistry;
+        this.keyGroupRange = keyGroupRange;
+        this.instanceBasePath = instanceBasePath;
+        this.numberOfTransferringThreads = numberOfTransferringThreads;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keySerializerProvider = keySerializerProvider;
+        this.userCodeClassLoader = userCodeClassLoader;
     }
 
     /** Root method that branches for different implementations of {@link KeyedStateHandle}. */
@@ -153,9 +167,9 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             restoreWithoutRescaling(theFirstStateHandle);
         }
         return new RocksDBRestoreResult(
-                this.db,
-                defaultColumnFamilyHandle,
-                nativeMetricMonitor,
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
                 lastCompletedCheckpointId,
                 backendUID,
                 restoredSstFiles);
@@ -216,10 +230,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 readMetaData(localKeyedStateHandle.getMetaDataState());
         List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                 serializationProxy.getStateMetaInfoSnapshots();
-        columnFamilyDescriptors =
-                createAndRegisterColumnFamilyDescriptors(
-                        stateMetaInfoSnapshots, true, writeBufferManagerCapacity);
-        columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
 
         Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
 
@@ -228,19 +238,10 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 operatorIdentifier,
                 backendUID);
 
-        if (!instanceRocksDBPath.mkdirs()) {
-            String errMsg =
-                    "Could not create RocksDB data directory: "
-                            + instanceBasePath.getAbsolutePath();
-            logger.error(errMsg);
-            throw new IOException(errMsg);
-        }
-
-        restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
-
-        openDB();
-
-        registerColumnFamilyHandles(stateMetaInfoSnapshots);
+        this.rocksHandle.openDB(
+                createlumnFamilyDescriptors(stateMetaInfoSnapshots, true),
+                stateMetaInfoSnapshots,
+                restoreSourcePath);
     }
 
     private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
@@ -273,14 +274,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         }
     }
 
-    private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) {
-        // Register CF handlers
-        for (int i = 0; i < metaInfoSnapshots.size(); ++i) {
-            getOrRegisterStateColumnFamilyHandle(
-                    columnFamilyHandles.get(i), metaInfoSnapshots.get(i));
-        }
-    }
-
     /**
      * Recovery from multi incremental states with rescaling. For rescaling, this method creates a
      * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance
@@ -299,7 +292,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             restoreStateHandles.remove(initialHandle);
             initDBWithRescaling(initialHandle);
         } else {
-            openDB();
+            this.rocksHandle.openDB();
         }
 
         // Transfer remaining key-groups from temporary instance into base DB
@@ -330,7 +323,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                                     (IncrementalRemoteKeyedStateHandle) rawStateHandle,
                                     temporaryRestoreInstancePath);
                     RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(this.db, writeBatchSize)) {
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) {
 
                 List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
                         tmpRestoreDBInfo.columnFamilyDescriptors;
@@ -343,7 +337,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                     ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
 
                     ColumnFamilyHandle targetColumnFamilyHandle =
-                            getOrRegisterStateColumnFamilyHandle(
+                            this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
                                             null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
                                     .columnFamilyHandle;
 
@@ -390,8 +384,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         // 2. Clip the base DB instance
         try {
             RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
-                    db,
-                    columnFamilyHandles,
+                    this.rocksHandle.getDb(),
+                    this.rocksHandle.getColumnFamilyHandles(),
                     keyGroupRange,
                     initialHandle.getKeyGroupRange(),
                     keyGroupPrefixBytes,
@@ -463,8 +457,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                 serializationProxy.getStateMetaInfoSnapshots();
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                createAndRegisterColumnFamilyDescriptors(
-                        stateMetaInfoSnapshots, false, writeBufferManagerCapacity);
+                createlumnFamilyDescriptors(stateMetaInfoSnapshots, false);
 
         List<ColumnFamilyHandle> columnFamilyHandles =
                 new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
@@ -475,8 +468,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
                         columnFamilyDescriptors,
                         columnFamilyHandles,
                         RocksDBOperationUtils.createColumnFamilyOptions(
-                                columnFamilyOptionsFactory, "default"),
-                        dbOptions);
+                                this.rocksHandle.getColumnFamilyOptionsFactory(), "default"),
+                        this.rocksHandle.getDbOptions());
 
         return new RestoredDBInstance(
                 restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
@@ -486,10 +479,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
      * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state
      * meta data snapshot.
      */
-    private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
-            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
-            boolean registerTtlCompactFilter,
-            Long writeBufferManagerCapacity) {
+    private List<ColumnFamilyDescriptor> createlumnFamilyDescriptors(
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                 new ArrayList<>(stateMetaInfoSnapshots.size());
@@ -500,37 +491,17 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             ColumnFamilyDescriptor columnFamilyDescriptor =
                     RocksDBOperationUtils.createColumnFamilyDescriptor(
                             metaInfoBase,
-                            columnFamilyOptionsFactory,
-                            registerTtlCompactFilter ? ttlCompactFiltersManager : null,
-                            writeBufferManagerCapacity);
+                            this.rocksHandle.getColumnFamilyOptionsFactory(),
+                            registerTtlCompactFilter
+                                    ? this.rocksHandle.getTtlCompactFiltersManager()
+                                    : null,
+                            this.rocksHandle.getWriteBufferManagerCapacity());
 
             columnFamilyDescriptors.add(columnFamilyDescriptor);
         }
         return columnFamilyDescriptors;
     }
 
-    /**
-     * This recreates the new working directory of the recovered RocksDB instance and links/copies
-     * the contents from a local state.
-     */
-    private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath)
-            throws IOException {
-        final Path instanceRocksDBDirectory = Paths.get(instanceRocksDBPath);
-        final Path[] files = FileUtils.listDirectory(source);
-
-        for (Path file : files) {
-            final String fileName = file.getFileName().toString();
-            final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
-            if (fileName.endsWith(SST_FILE_SUFFIX)) {
-                // hardlink'ing the immutable sst-files.
-                Files.createLink(targetFile, file);
-            } else {
-                // true copy for all other files.
-                Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
-            }
-        }
-    }
-
     /** Reads Flink's state meta data file from the state handle. */
     private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle)
             throws Exception {
@@ -548,4 +519,44 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             }
         }
     }
+
+    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
+            throws IOException, StateMigrationException {
+        // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
+        // deserialization of state happens lazily during runtime; we depend on the fact
+        // that the new serializer for states could be compatible, and therefore the restore can
+        // continue
+        // without old serializers required to be present.
+        KeyedBackendSerializationProxy<K> serializationProxy =
+                new KeyedBackendSerializationProxy<>(userCodeClassLoader);
+        serializationProxy.read(dataInputView);
+        if (!isKeySerializerCompatibilityChecked) {
+            // fetch current serializer now because if it is incompatible, we can't access
+            // it anymore to improve the error message
+            TypeSerializer<K> currentSerializer = keySerializerProvider.currentSchemaSerializer();
+            // check for key serializer compatibility; this also reconfigures the
+            // key serializer to be compatible, if it is required and is possible
+            TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
+                    keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(
+                            serializationProxy.getKeySerializerSnapshot());
+            if (keySerializerSchemaCompat.isCompatibleAfterMigration()
+                    || keySerializerSchemaCompat.isIncompatible()) {
+                throw new StateMigrationException(
+                        "The new key serializer ("
+                                + currentSerializer
+                                + ") must be compatible with the previous key serializer ("
+                                + keySerializerProvider.previousSchemaSerializer()
+                                + ").");
+            }
+
+            isKeySerializerCompatibilityChecked = true;
+        }
+
+        return serializationProxy;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
index c50e553..4202c89 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
@@ -21,11 +21,7 @@ package org.apache.flink.contrib.streaming.state.restore;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StateSerializerProvider;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -33,52 +29,48 @@ import org.rocksdb.DBOptions;
 import javax.annotation.Nonnull;
 
 import java.io.File;
-import java.util.Collection;
 import java.util.Map;
 import java.util.function.Function;
 
 /** Encapsulates the process of initiating a RocksDB instance without restore. */
-public class RocksDBNoneRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
+public class RocksDBNoneRestoreOperation<K> implements RocksDBRestoreOperation {
+    private final RocksDBHandle rocksHandle;
+
     public RocksDBNoneRestoreOperation(
-            KeyGroupRange keyGroupRange,
-            int keyGroupPrefixBytes,
-            int numberOfTransferringThreads,
-            CloseableRegistry cancelStreamRegistry,
-            ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
-            StateSerializerProvider<K> keySerializerProvider,
-            File instanceBasePath,
             File instanceRocksDBPath,
             DBOptions dbOptions,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             RocksDBNativeMetricOptions nativeMetricOptions,
             MetricGroup metricGroup,
-            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             Long writeBufferManagerCapacity) {
-        super(
-                keyGroupRange,
-                keyGroupPrefixBytes,
-                numberOfTransferringThreads,
-                cancelStreamRegistry,
-                userCodeClassLoader,
-                kvStateInformation,
-                keySerializerProvider,
-                instanceBasePath,
-                instanceRocksDBPath,
-                dbOptions,
-                columnFamilyOptionsFactory,
-                nativeMetricOptions,
-                metricGroup,
-                restoreStateHandles,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity);
+        this.rocksHandle =
+                new RocksDBHandle(
+                        kvStateInformation,
+                        instanceRocksDBPath,
+                        dbOptions,
+                        columnFamilyOptionsFactory,
+                        nativeMetricOptions,
+                        metricGroup,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
     }
 
     @Override
     public RocksDBRestoreResult restore() throws Exception {
-        openDB();
+        this.rocksHandle.openDB();
         return new RocksDBRestoreResult(
-                this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null);
+                this.rocksHandle.getDb(),
+                this.rocksHandle.getDefaultColumnFamilyHandle(),
+                this.rocksHandle.getNativeMetricMonitor(),
+                -1,
+                null,
+                null);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.rocksHandle.close();
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
index 56d14ea..b70df73 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreOperation.java
@@ -21,7 +21,8 @@ package org.apache.flink.contrib.streaming.state.restore;
 import org.apache.flink.runtime.state.RestoreOperation;
 
 /** Interface for RocksDB restore. */
-public interface RocksDBRestoreOperation extends RestoreOperation<RocksDBRestoreResult> {
+public interface RocksDBRestoreOperation
+        extends RestoreOperation<RocksDBRestoreResult>, AutoCloseable {
     /** Restores state that was previously snapshot-ed from the provided state handles. */
     RocksDBRestoreResult restore() throws Exception;
 }


[flink] 05/09: [hotfix] Fix RocksIncrementalCheckpointRescalingTest

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be628c67b150b03b316b55f09a7292939de21c0c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 4 09:09:11 2021 +0100

    [hotfix] Fix RocksIncrementalCheckpointRescalingTest
    
    Few cases that were checked in the test are actually illegal
    combination. They were testing keys that should never end up in a given
    sub task as they do not belong to a key group owned by the task.
---
 .../RocksIncrementalCheckpointRescalingTest.java   | 42 ----------------------
 1 file changed, 42 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
index 580c35b..baf418f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
@@ -187,12 +187,6 @@ public class RocksIncrementalCheckpointRescalingTest extends TestLogger {
             snapshot2 =
                     AbstractStreamOperatorTestHarness.repackageState(
                             harness2[0].snapshot(0, 0), harness2[1].snapshot(0, 0));
-
-            validHarnessResult(
-                    harness2[0], 1, records[5], records[6], records[7], records[8], records[9]);
-
-            validHarnessResult(
-                    harness2[1], 1, records[0], records[1], records[2], records[3], records[4]);
         } finally {
             closeHarness(harness2);
         }
@@ -253,36 +247,6 @@ public class RocksIncrementalCheckpointRescalingTest extends TestLogger {
             validHarnessResult(harness3[0], 3, records[0], records[1], records[2], records[3]);
             validHarnessResult(harness3[1], 3, records[4], records[5], records[6]);
             validHarnessResult(harness3[2], 3, records[7], records[8], records[9]);
-
-            validHarnessResult(
-                    harness3[0],
-                    1,
-                    records[4],
-                    records[5],
-                    records[6],
-                    records[7],
-                    records[8],
-                    records[9]);
-            validHarnessResult(
-                    harness3[1],
-                    1,
-                    records[0],
-                    records[1],
-                    records[2],
-                    records[3],
-                    records[7],
-                    records[8],
-                    records[9]);
-            validHarnessResult(
-                    harness3[2],
-                    1,
-                    records[0],
-                    records[1],
-                    records[2],
-                    records[3],
-                    records[4],
-                    records[5],
-                    records[6]);
         } finally {
             closeHarness(harness3);
         }
@@ -390,12 +354,6 @@ public class RocksIncrementalCheckpointRescalingTest extends TestLogger {
             snapshot2 =
                     AbstractStreamOperatorTestHarness.repackageState(
                             harness2[0].snapshot(0, 0), harness2[1].snapshot(0, 0));
-
-            validHarnessResult(
-                    harness2[0], 1, records[5], records[6], records[7], records[8], records[9]);
-
-            validHarnessResult(
-                    harness2[1], 1, records[0], records[1], records[2], records[3], records[4]);
         } finally {
             closeHarness(harness2);
         }