You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2023/12/20 10:04:51 UTC
(flink) branch master updated: [FLINK-33863] Fix restoring compressed operator state (#23938)
This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d415d93bbf9 [FLINK-33863] Fix restoring compressed operator state (#23938)
d415d93bbf9 is described below
commit d415d93bbf9620ba985136469107edd8c6e31cc6
Author: Ruibin Xing <xi...@gmail.com>
AuthorDate: Wed Dec 20 18:04:44 2023 +0800
[FLINK-33863] Fix restoring compressed operator state (#23938)
---
.../state/OperatorStateRestoreOperation.java | 30 +++++-
.../state/OperatorStateRestoreOperationTest.java | 114 +++++++++++++++++++++
2 files changed, 142 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index f818eb81978..fd983fd5d28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -32,9 +32,12 @@ import org.apache.commons.io.IOUtils;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/** Implementation of operator state restore operation. */
public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
@@ -168,9 +171,32 @@ public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
}
}
+ List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries =
+ new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());
+
+ if (backendSerializationProxy.isUsingStateCompression()) {
+ // sort state handles by offsets to avoid building SnappyFramedInputStream with
+ // EOF stream.
+ entries =
+ entries.stream()
+ .sorted(
+ Comparator.comparingLong(
+ entry -> {
+ OperatorStateHandle.StateMetaInfo
+ stateMetaInfo = entry.getValue();
+ long[] offsets = stateMetaInfo.getOffsets();
+ if (offsets == null
+ || offsets.length == 0) {
+ return Long.MIN_VALUE;
+ } else {
+ return offsets[0];
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
// Restore all the states
- for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
- stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+ for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) {
final String stateName = nameToOffsets.getKey();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
new file mode 100644
index 00000000000..e0aecd5d723
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/** Tests for the {@link org.apache.flink.runtime.state.OperatorStateRestoreOperation}. */
+public class OperatorStateRestoreOperationTest {
+
+ @Nullable
+ private static OperatorStateHandle createOperatorStateHandle(
+ ExecutionConfig cfg,
+ CloseableRegistry cancelStreamRegistry,
+ ClassLoader classLoader,
+ List<String> stateNames,
+ List<String> broadcastStateNames)
+ throws Exception {
+
+ try (OperatorStateBackend operatorStateBackend =
+ new DefaultOperatorStateBackendBuilder(
+ classLoader,
+ cfg,
+ false,
+ Collections.emptyList(),
+ cancelStreamRegistry)
+ .build()) {
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
+
+ for (String stateName : stateNames) {
+ ListStateDescriptor<String> descriptor =
+ new ListStateDescriptor<>(stateName, String.class);
+ PartitionableListState<String> state =
+ (PartitionableListState<String>)
+ operatorStateBackend.getListState(descriptor);
+ state.add("value1");
+ }
+
+ for (String broadcastStateName : broadcastStateNames) {
+ MapStateDescriptor<String, String> descriptor =
+ new MapStateDescriptor<>(broadcastStateName, String.class, String.class);
+ BroadcastState<String, String> state =
+ operatorStateBackend.getBroadcastState(descriptor);
+ state.put("key1", "value1");
+ }
+
+ SnapshotResult<OperatorStateHandle> result =
+ operatorStateBackend
+ .snapshot(
+ 1,
+ 1,
+ streamFactory,
+ CheckpointOptions.forCheckpointWithDefaultLocation())
+ .get();
+ return result.getJobManagerOwnedSnapshot();
+ }
+ }
+
+ @Test
+ public void testRestoringMixedOperatorStateWhenSnapshotCompressionIsEnabled() throws Exception {
+ ExecutionConfig cfg = new ExecutionConfig();
+ cfg.setUseSnapshotCompression(true);
+ CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+ ClassLoader classLoader = this.getClass().getClassLoader();
+
+ OperatorStateHandle handle =
+ createOperatorStateHandle(
+ cfg,
+ cancelStreamRegistry,
+ classLoader,
+ Arrays.asList("s1", "s2"),
+ Collections.singletonList("b2"));
+
+ OperatorStateRestoreOperation operatorStateRestoreOperation =
+ new OperatorStateRestoreOperation(
+ cancelStreamRegistry,
+ classLoader,
+ new HashMap<>(),
+ new HashMap<>(),
+ Collections.singletonList(handle));
+
+ operatorStateRestoreOperation.restore();
+ }
+}