You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/26 08:15:39 UTC

[flink] branch release-1.12 updated: [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new f00bfaf  [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
f00bfaf is described below

commit f00bfaf0b346c3cc0b1895c1707ee43d5d3c00ba
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 22 16:49:43 2021 +0100

    [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
    
    This closes #15021
---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  11 +-
 .../streaming/state/restore/PriorityQueueFlag.java |  29 +++++
 .../state/restore/RocksDBFullRestoreOperation.java |  13 +-
 .../RocksDBIncrementalRestoreOperation.java        |  16 ++-
 .../state/HeapTimersSnapshottingTest.java          | 135 +++++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java    |  16 ++-
 6 files changed, 213 insertions(+), 7 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d2226a8..0b7e526 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
 import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
@@ -417,6 +418,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     optionsContainer.getWriteBufferManagerCapacity());
         }
         KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
+        PriorityQueueFlag queueRestoreEnabled =
+                priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP
+                        ? PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE
+                        : PriorityQueueFlag.RESTORE_PRIORITY_QUEUE;
         if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
             return new RocksDBIncrementalRestoreOperation<>(
                     operatorIdentifier,
@@ -436,7 +441,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     restoreStateHandles,
                     ttlCompactFiltersManager,
                     writeBatchSize,
-                    optionsContainer.getWriteBufferManagerCapacity());
+                    optionsContainer.getWriteBufferManagerCapacity(),
+                    queueRestoreEnabled);
         } else {
             return new RocksDBFullRestoreOperation<>(
                     keyGroupRange,
@@ -455,7 +461,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                     restoreStateHandles,
                     ttlCompactFiltersManager,
                     writeBatchSize,
-                    optionsContainer.getWriteBufferManagerCapacity());
+                    optionsContainer.getWriteBufferManagerCapacity(),
+                    queueRestoreEnabled);
         }
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
new file mode 100644
index 0000000..3fb2fa1
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.restore;
+
+/**
+ * Flag indicating what should be done when a priority queue is spotted when restoring a RocksDB
+ * snapshot. Prior to Flink 1.13 it was not possible to change from Rocks timers to Heap timers. In
+ * this case we should throw an exception.
+ */
+public enum PriorityQueueFlag {
+    THROW_ON_PRIORITY_QUEUE,
+    RESTORE_PRIORITY_QUEUE
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 7945b84..5e4a897 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -87,6 +88,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
     /** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
     private final long writeBatchSize;
 
+    private final PriorityQueueFlag queueRestoreEnabled;
+
     public RocksDBFullRestoreOperation(
             KeyGroupRange keyGroupRange,
             int keyGroupPrefixBytes,
@@ -104,7 +107,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
-            Long writeBufferManagerCapacity) {
+            Long writeBufferManagerCapacity,
+            PriorityQueueFlag queueRestoreEnabled) {
         super(
                 keyGroupRange,
                 keyGroupPrefixBytes,
@@ -124,6 +128,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
                 writeBufferManagerCapacity);
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
+        this.queueRestoreEnabled = queueRestoreEnabled;
     }
 
     /** Restores all key-groups data that is referenced by the passed state handles. */
@@ -182,6 +187,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
         currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
 
         for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
+            if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE
+                    && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+                throw new StateMigrationException(
+                        "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+            }
+
             RocksDbKvStateInfo registeredStateCFHandle =
                     getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
             currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index a17552d..814874d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -85,6 +86,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 
     private final String operatorIdentifier;
     private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+    private final PriorityQueueFlag queueRestoreEnabled;
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
@@ -107,7 +109,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
-            Long writeBufferManagerCapacity) {
+            Long writeBufferManagerCapacity,
+            PriorityQueueFlag queueRestoreEnabled) {
         super(
                 keyGroupRange,
                 keyGroupPrefixBytes,
@@ -129,6 +132,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
         this.restoredSstFiles = new TreeMap<>();
         this.lastCompletedCheckpointId = -1L;
         this.backendUID = UUID.randomUUID();
+        this.queueRestoreEnabled = queueRestoreEnabled;
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
         this.writeBatchSize = writeBatchSize;
     }
@@ -489,12 +493,20 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
     private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
             List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
             boolean registerTtlCompactFilter,
-            Long writeBufferManagerCapacity) {
+            Long writeBufferManagerCapacity)
+            throws StateMigrationException {
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                 new ArrayList<>(stateMetaInfoSnapshots.size());
 
         for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+            if (stateMetaInfoSnapshot.getBackendStateType()
+                            == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE
+                    && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+                throw new StateMigrationException(
+                        "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+            }
+
             RegisteredStateMetaInfoBase metaInfoBase =
                     RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
             ColumnFamilyDescriptor columnFamilyDescriptor =
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..afa9ad4
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StateMigrationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+
+/**
+ * The RocksDB based timers are ignored when restored with RocksDB and heap timers enabled. These
+ * tests verify that an exception is thrown when users try to perform that incompatible migration.
+ */
+public class HeapTimersSnapshottingTest {
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimers() throws Exception {
+        OperatorSubtaskState state;
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+        }
+
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+
+            thrown.expect(
+                    containsCause(
+                            new StateMigrationException(
+                                    "Can not restore savepoint taken with RocksDB timers enabled"
+                                            + " with Heap timers!")));
+            testHarness.initializeState(state);
+        }
+    }
+
+    @Test
+    public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimersIncrementalCheckpoints()
+            throws Exception {
+        OperatorSubtaskState state;
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+            backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+            testHarness.setStateBackend(backend);
+            testHarness.open();
+            testHarness.processElement(0, 0L);
+
+            state =
+                    testHarness
+                            .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+                            .getJobManagerOwnedState();
+        }
+
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+                getTestHarness()) {
+            RocksDBStateBackend backend =
+                    new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+            backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+            testHarness.setStateBackend(backend);
+
+            thrown.expect(
+                    containsCause(
+                            new StateMigrationException(
+                                    "Can not restore savepoint taken with RocksDB timers enabled"
+                                            + " with Heap timers!")));
+            testHarness.initializeState(state);
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness()
+            throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(
+                        new KeyedProcessFunction<Integer, Integer, Integer>() {
+                            @Override
+                            public void processElement(
+                                    Integer value, Context ctx, Collector<Integer> out) {
+                                ctx.timerService().registerEventTimeTimer(0L);
+                            }
+                        }),
+                (KeySelector<Integer, Integer>) value -> value,
+                BasicTypeInfo.INT_TYPE_INFO,
+                1,
+                1,
+                0);
+    }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 9dfb300..474e762 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -642,14 +643,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
      */
     public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp)
             throws Exception {
+        return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
+    }
+
+    /**
+     * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+     * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+     */
+    public OperatorSnapshotFinalizer snapshotWithLocalState(
+            long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception {
 
+        CheckpointStorageLocationReference locationReference =
+                CheckpointStorageLocationReference.getDefault();
         OperatorSnapshotFutures operatorStateResult =
                 operator.snapshotState(
                         checkpointId,
                         timestamp,
-                        CheckpointOptions.forCheckpointWithDefaultLocation(),
+                        new CheckpointOptions(checkpointType, locationReference),
                         checkpointStorageAccess.resolveCheckpointStorageLocation(
-                                checkpointId, CheckpointStorageLocationReference.getDefault()));
+                                checkpointId, locationReference));
 
         return new OperatorSnapshotFinalizer(operatorStateResult);
     }