You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/26 08:18:25 UTC
[flink] branch release-1.11 updated: [FLINK-21486] Throw exception
when restoring Rocks timers with Heap timers enabled
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c6b10f6 [FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
c6b10f6 is described below
commit c6b10f60db09852ca5126b2ce9f71aa17ce207d8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Feb 22 16:49:43 2021 +0100
[FLINK-21486] Throw exception when restoring Rocks timers with Heap timers enabled
This closes #15022
---
.../flink-statebackend-rocksdb/pom.xml | 19 ++-
.../state/RocksDBKeyedStateBackendBuilder.java | 11 +-
.../streaming/state/restore/PriorityQueueFlag.java | 29 +++++
.../state/restore/RocksDBFullRestoreOperation.java | 13 +-
.../RocksDBIncrementalRestoreOperation.java | 16 ++-
.../state/HeapTimersSnapshottingTest.java | 135 +++++++++++++++++++++
.../util/AbstractStreamOperatorTestHarness.java | 16 ++-
7 files changed, 228 insertions(+), 11 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 1ae62e6..1bdd5cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -19,8 +19,8 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -81,6 +81,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<!-- build a test jar -->
@@ -96,8 +103,12 @@ under the License.
</goals>
<configuration>
<includes>
- <include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include>
- <include>**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*</include>
+ <include>
+ **/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*
+ </include>
+ <include>
+ **/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*
+ </include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 83faeb3..1e101e8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
+import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
@@ -419,6 +420,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
ttlCompactFiltersManager);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
+ PriorityQueueFlag queueRestoreEnabled =
+ priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP
+ ? PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE
+ : PriorityQueueFlag.RESTORE_PRIORITY_QUEUE;
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(
operatorIdentifier,
@@ -437,7 +442,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
- writeBatchSize);
+ writeBatchSize,
+ queueRestoreEnabled);
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
@@ -455,7 +461,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
- writeBatchSize);
+ writeBatchSize,
+ queueRestoreEnabled);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
new file mode 100644
index 0000000..3fb2fa1
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/PriorityQueueFlag.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.restore;
+
+/**
+ * Flag indicating what should be done when a priority queue is spotted when restoring a RocksDB
+ * snapshot. Prior to Flink 1.13 it was not possible to change from Rocks timers to Heap timers. In
+ * this case we should throw an exception.
+ */
+public enum PriorityQueueFlag {
+ THROW_ON_PRIORITY_QUEUE,
+ RESTORE_PRIORITY_QUEUE
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 00163bc..a1d13cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
@@ -86,6 +87,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
/** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
private final long writeBatchSize;
+ private final PriorityQueueFlag queueRestoreEnabled;
+
public RocksDBFullRestoreOperation(
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
@@ -102,7 +105,8 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
- @Nonnegative long writeBatchSize) {
+ @Nonnegative long writeBatchSize,
+ PriorityQueueFlag queueRestoreEnabled) {
super(
keyGroupRange,
keyGroupPrefixBytes,
@@ -121,6 +125,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
ttlCompactFiltersManager);
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
this.writeBatchSize = writeBatchSize;
+ this.queueRestoreEnabled = queueRestoreEnabled;
}
/** Restores all key-groups data that is referenced by the passed state handles. */
@@ -181,6 +186,12 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
+ if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE
+ && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+ throw new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+ }
+
RocksDbKvStateInfo registeredStateCFHandle =
getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f755940..9380bc1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -88,6 +89,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
private final String operatorIdentifier;
private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+ private final PriorityQueueFlag queueRestoreEnabled;
private long lastCompletedCheckpointId;
private UUID backendUID;
private final long writeBatchSize;
@@ -109,7 +111,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
- @Nonnegative long writeBatchSize) {
+ @Nonnegative long writeBatchSize,
+ PriorityQueueFlag queueRestoreEnabled) {
super(
keyGroupRange,
keyGroupPrefixBytes,
@@ -130,6 +133,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
this.restoredSstFiles = new TreeMap<>();
this.lastCompletedCheckpointId = -1L;
this.backendUID = UUID.randomUUID();
+ this.queueRestoreEnabled = queueRestoreEnabled;
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
this.writeBatchSize = writeBatchSize;
}
@@ -483,12 +487,20 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
* meta data snapshot.
*/
private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
- List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
+ List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter)
+ throws StateMigrationException {
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(stateMetaInfoSnapshots.size());
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+ if (stateMetaInfoSnapshot.getBackendStateType()
+ == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE
+ && queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
+ throw new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
+ }
+
RegisteredStateMetaInfoBase metaInfoBase =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
ColumnFamilyDescriptor columnFamilyDescriptor =
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
new file mode 100644
index 0000000..5f81a57
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StateMigrationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * The RocksDB based timers are ignored when restored with RocksDB and heap timers enabled. These
+ * tests verify that an exception is thrown when users try to perform that incompatible migration.
+ */
+public class HeapTimersSnapshottingTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimers() throws Exception {
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+ backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+ testHarness.setStateBackend(backend);
+ testHarness.open();
+ testHarness.processElement(0, 0L);
+
+ state =
+ testHarness
+ .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+ .getJobManagerOwnedState();
+ }
+
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI());
+ backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+ testHarness.setStateBackend(backend);
+
+ thrown.expect(
+ containsCause(
+ new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled"
+ + " with Heap timers!")));
+ testHarness.initializeState(state);
+ }
+ }
+
+ @Test
+ public void testThrowExceptionWhenRestoringRocksTimersWithHeapTimersIncrementalCheckpoints()
+ throws Exception {
+ OperatorSubtaskState state;
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+ backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
+ testHarness.setStateBackend(backend);
+ testHarness.open();
+ testHarness.processElement(0, 0L);
+
+ state =
+ testHarness
+ .snapshotWithLocalState(0L, 1L, CheckpointType.SAVEPOINT)
+ .getJobManagerOwnedState();
+ }
+
+ try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
+ getTestHarness()) {
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(temporaryFolder.newFolder().toURI(), true);
+ backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
+ testHarness.setStateBackend(backend);
+
+ thrown.expect(
+ containsCause(
+ new StateMigrationException(
+ "Can not restore savepoint taken with RocksDB timers enabled"
+ + " with Heap timers!")));
+ testHarness.initializeState(state);
+ }
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getTestHarness()
+ throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedProcessOperator<>(
+ new KeyedProcessFunction<Integer, Integer, Integer>() {
+ @Override
+ public void processElement(
+ Integer value, Context ctx, Collector<Integer> out) {
+ ctx.timerService().registerEventTimeTimer(0L);
+ }
+ }),
+ (KeySelector<Integer, Integer>) value -> value,
+ BasicTypeInfo.INT_TYPE_INFO,
+ 1,
+ 1,
+ 0);
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 641a7d4..7d10af8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -594,14 +595,25 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
*/
public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp)
throws Exception {
+ return snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
+ }
+
+ /**
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions,
+ * org.apache.flink.runtime.state.CheckpointStreamFactory)}.
+ */
+ public OperatorSnapshotFinalizer snapshotWithLocalState(
+ long checkpointId, long timestamp, CheckpointType checkpointType) throws Exception {
+ CheckpointStorageLocationReference locationReference =
+ CheckpointStorageLocationReference.getDefault();
OperatorSnapshotFutures operatorStateResult =
operator.snapshotState(
checkpointId,
timestamp,
- CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new CheckpointOptions(checkpointType, locationReference),
checkpointStorage.resolveCheckpointStorageLocation(
- checkpointId, CheckpointStorageLocationReference.getDefault()));
+ checkpointId, locationReference));
return new OperatorSnapshotFinalizer(operatorStateResult);
}