You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2023/12/20 10:04:51 UTC

(flink) branch master updated: [FLINK-33863] Fix restoring compressed operator state (#23938)

This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d415d93bbf9 [FLINK-33863] Fix restoring compressed operator state (#23938)
d415d93bbf9 is described below

commit d415d93bbf9620ba985136469107edd8c6e31cc6
Author: Ruibin Xing <xi...@gmail.com>
AuthorDate: Wed Dec 20 18:04:44 2023 +0800

    [FLINK-33863] Fix restoring compressed operator state (#23938)
---
 .../state/OperatorStateRestoreOperation.java       |  30 +++++-
 .../state/OperatorStateRestoreOperationTest.java   | 114 +++++++++++++++++++++
 2 files changed, 142 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index f818eb81978..fd983fd5d28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -32,9 +32,12 @@ import org.apache.commons.io.IOUtils;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** Implementation of operator state restore operation. */
 public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
@@ -168,9 +171,32 @@ public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
                     }
                 }
 
+                List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries =
+                        new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());
+
+                if (backendSerializationProxy.isUsingStateCompression()) {
+                    // sort state handles by offsets to avoid building SnappyFramedInputStream with
+                    // EOF stream.
+                    entries =
+                            entries.stream()
+                                    .sorted(
+                                            Comparator.comparingLong(
+                                                    entry -> {
+                                                        OperatorStateHandle.StateMetaInfo
+                                                                stateMetaInfo = entry.getValue();
+                                                        long[] offsets = stateMetaInfo.getOffsets();
+                                                        if (offsets == null
+                                                                || offsets.length == 0) {
+                                                            return Long.MIN_VALUE;
+                                                        } else {
+                                                            return offsets[0];
+                                                        }
+                                                    }))
+                                    .collect(Collectors.toList());
+                }
+
                 // Restore all the states
-                for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
-                        stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+                for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) {
 
                     final String stateName = nameToOffsets.getKey();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
new file mode 100644
index 00000000000..e0aecd5d723
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/** Tests for the {@link org.apache.flink.runtime.state.OperatorStateRestoreOperation}. */
+public class OperatorStateRestoreOperationTest {
+
+    @Nullable
+    private static OperatorStateHandle createOperatorStateHandle(
+            ExecutionConfig cfg,
+            CloseableRegistry cancelStreamRegistry,
+            ClassLoader classLoader,
+            List<String> stateNames,
+            List<String> broadcastStateNames)
+            throws Exception {
+
+        try (OperatorStateBackend operatorStateBackend =
+                new DefaultOperatorStateBackendBuilder(
+                                classLoader,
+                                cfg,
+                                false,
+                                Collections.emptyList(),
+                                cancelStreamRegistry)
+                        .build()) {
+            CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
+
+            for (String stateName : stateNames) {
+                ListStateDescriptor<String> descriptor =
+                        new ListStateDescriptor<>(stateName, String.class);
+                PartitionableListState<String> state =
+                        (PartitionableListState<String>)
+                                operatorStateBackend.getListState(descriptor);
+                state.add("value1");
+            }
+
+            for (String broadcastStateName : broadcastStateNames) {
+                MapStateDescriptor<String, String> descriptor =
+                        new MapStateDescriptor<>(broadcastStateName, String.class, String.class);
+                BroadcastState<String, String> state =
+                        operatorStateBackend.getBroadcastState(descriptor);
+                state.put("key1", "value1");
+            }
+
+            SnapshotResult<OperatorStateHandle> result =
+                    operatorStateBackend
+                            .snapshot(
+                                    1,
+                                    1,
+                                    streamFactory,
+                                    CheckpointOptions.forCheckpointWithDefaultLocation())
+                            .get();
+            return result.getJobManagerOwnedSnapshot();
+        }
+    }
+
+    @Test
+    public void testRestoringMixedOperatorStateWhenSnapshotCompressionIsEnabled() throws Exception {
+        ExecutionConfig cfg = new ExecutionConfig();
+        cfg.setUseSnapshotCompression(true);
+        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+        ClassLoader classLoader = this.getClass().getClassLoader();
+
+        OperatorStateHandle handle =
+                createOperatorStateHandle(
+                        cfg,
+                        cancelStreamRegistry,
+                        classLoader,
+                        Arrays.asList("s1", "s2"),
+                        Collections.singletonList("b2"));
+
+        OperatorStateRestoreOperation operatorStateRestoreOperation =
+                new OperatorStateRestoreOperation(
+                        cancelStreamRegistry,
+                        classLoader,
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        Collections.singletonList(handle));
+
+        operatorStateRestoreOperation.restore();
+    }
+}