You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/26 08:15:39 UTC
[flink] branch release-1.12 updated: [FLINK-21486] Throw exception
when restoring Rocks timers with Heap timers enabled
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new f00bfaf [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
f00bfaf is described below
commit f00bfaf0b346c3cc0b1895c1707ee43d5d3c00ba
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 22 16:49:43 2021 +0100
[FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
This closes #15021
---
.../state/RocksDBKeyedStateBackendBuilder.java | 11 +-
.../streaming/state/restore/PriorityQueueFlag.java | 29 +++++
.../state/restore/RocksDBFullRestoreOperation.java | 13 +-
.../RocksDBIncrementalRestoreOperation.java | 16 ++-
.../state/HeapTimersSnapshottingTest.java | 135 +++++++++++++++++++++
.../util/AbstractStreamOperatorTestHarness.java | 16 ++-
6 files changed, 213 insertions(+), 7 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d2226a8..0b7e526 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
@@ -417,6 +418,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
optionsContainer.getWriteBufferManagerCapacity());
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
+ PriorityQueueFlag queueRestoreEnabled =
+ priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP
+ ? PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE
+ : PriorityQueueFlag.RESTORE_PRIORITY_QUEUE;
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(
operatorIdentifier,
@@ -436,7 +441,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
restoreStateHandles,
ttlCompactFiltersManager,
writeBatchSize,
- optionsContainer.getWriteBufferManagerCapacity());
+ optionsContainer.getWriteBufferManagerCapacity(),
+ queueRestoreEnabled);
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
@@ -455,7 +461,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
restoreStateHandles,
ttlCompactFiltersManager,
writeBatchSize,
- optionsContainer.getWriteBufferManagerCapacity());
+ optionsContainer.getWriteBufferManagerCapacity(),
+ queueRestoreEnabled);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
new file mode 100644
index 0000000..3fb2fa1
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.restore;
+
+/**
+ * Flag indicating what should be done when a priority queue is spotted when restoring a RocksDB
+ * snapshot. Prior to Flink 1.13 it was not possible to change from Rocks timers to Heap timers. In
+ * this case we should throw an exception.
+ */
+public enum PriorityQueueFlag {
+ THROW_ON_PRIORITY_QUEUE,
+ RESTORE_PRIORITY_QUEUE
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 7945b84..5e4a897 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
@@ -87,6 +88,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
/** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
private final long writeBatchSize;
+ private final PriorityQueueFlag queueRestoreEnabled;
+
public RocksDBFullRestoreOperation(
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
@@ -104,7 +107,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nonnegative long writeBatchSize,
- Long writeBufferManagerCapacity) {
+ Long writeBufferManagerCapacity,
+ PriorityQueueFlag queueRestoreEnabled) {
super(
keyGroupRange,
keyGroupPrefixBytes,
@@ -124,6 +128,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
writeBufferManagerCapacity);
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
this.writeBatchSize = writeBatchSize;
+ this.queueRestoreEnabled = queueRestoreEnabled;
}
/** Restores all key-groups data that is referenced by the passed state handles. */
@@ -182,6 +187,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
+ if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE
+ && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+ throw new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+ }
+
RocksDbKvStateInfo registeredStateCFHandle =
getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index a17552d..814874d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -85,6 +86,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
private final String operatorIdentifier;
private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+ private final PriorityQueueFlag queueRestoreEnabled;
private long lastCompletedCheckpointId;
private UUID backendUID;
private final long writeBatchSize;
@@ -107,7 +109,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nonnegative long writeBatchSize,
- Long writeBufferManagerCapacity) {
+ Long writeBufferManagerCapacity,
+ PriorityQueueFlag queueRestoreEnabled) {
super(
keyGroupRange,
keyGroupPrefixBytes,
@@ -129,6 +132,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
this.restoredSstFiles = new TreeMap<>();
this.lastCompletedCheckpointId = -1L;
this.backendUID = UUID.randomUUID();
+ this.queueRestoreEnabled = queueRestoreEnabled;
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
this.writeBatchSize = writeBatchSize;
}
@@ -489,12 +493,20 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
boolean registerTtlCompactFilter,
- Long writeBufferManagerCapacity) {
+ Long writeBufferManagerCapacity)
+ throws StateMigrationException {
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(stateMetaInfoSnapshots.size());
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+ if (stateMetaInfoSnapshot.getBackendStateType()
+ == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE
+ && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+ throw new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+ }
+
RegisteredStateMetaInfoBase metaInfoBase =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
ColumnFamilyDescriptor columnFamilyDescriptor =
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..afa9ad4
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StateMigrationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+
+/**
+ * The RocksDB based timers are ignored when restored with RocksDB and heap timers enabled. These
+ * tests verify that an exception is thrown when users try to perform that incompatible migration.
+ */
+public class HeapTimersSnapshottingTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimers() throws Exception {
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+ backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+ testHarness.setStateBackend(backend);
+ testHarness.open();
+ testHarness.processElement(0, 0L);
+
+ state =
+ testHarness
+ .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+ .getJobManagerOwnedState();
+ }
+
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+ backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+ testHarness.setStateBackend(backend);
+
+ thrown.expect(
+ containsCause(
+ new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled"
+ + " with Heap timers!")));
+ testHarness.initializeState(state);
+ }
+ }
+
+ @Test
+ public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimersIncrementalCheckpoints()
+ throws Exception {
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+ backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+ testHarness.setStateBackend(backend);
+ testHarness.open();
+ testHarness.processElement(0, 0L);
+
+ state =
+ testHarness
+ .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+ .getJobManagerOwnedState();
+ }
+
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+ backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+ testHarness.setStateBackend(backend);
+
+ thrown.expect(
+ containsCause(
+ new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled"
+ + " with Heap timers!")));
+ testHarness.initializeState(state);
+ }
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness()
+ throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedProcessOperator<>(
+ new KeyedProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) {
+ ctx.timerService().registerEventTimeTimer(0L);
+ }
+ }),
+ (KeySelector<Integer, Integer>) value -> value,
+ BasicTypeInfo.INT_TYPE_INFO,
+ 1,
+ 1,
+ 0);
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 9dfb300..474e762 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -642,14 +643,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
*/
public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp)
throws Exception {
+ return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
+ }
+
+ /**
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+ * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+ */
+ public OperatorSnapshotFinalizer snapshotWithLocalState(
+ long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception {
+ CheckpointStorageLocationReference locationReference =
+ CheckpointStorageLocationReference.getDefault();
OperatorSnapshotFutures operatorStateResult =
operator.snapshotState(
checkpointId,
timestamp,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new CheckpointOptions(checkpointType, locationReference),
checkpointStorageAccess.resolveCheckpointStorageLocation(
- checkpointId, CheckpointStorageLocationReference.getDefault()));
+ checkpointId, locationReference));
return new OperatorSnapshotFinalizer(operatorStateResult);
}