You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/09 17:36:08 UTC

[flink] branch master updated (ad4bdf9 -> fc995d3)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ad4bdf9  [hotfix][tests] Fix init of SavepointITCase.testStopSavepointWithBoundedInput
     new b6c8039  [hotfix] Enforce StateTable return IterableStateSnapshot
     new d67ace2  [FLINK-21206] Implement HeapKeyValueStateIterator
     new fc995d3  [FLINK-21206] Write savepoints in unified format from HeapStateBackend

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/state/KeyValueStateIterator.java |   4 +-
 .../state/heap/HeapKeyValueStateIterator.java      | 407 +++++++++++++++++++++
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  19 +-
 .../state/heap/HeapKeyedStateBackendBuilder.java   |  16 +-
 .../runtime/state/heap/HeapSavepointStrategy.java  |  95 +++++
 .../runtime/state/heap/HeapSnapshotResources.java  | 190 ++++++++++
 .../runtime/state/heap/HeapSnapshotStrategy.java   | 116 +-----
 .../flink/runtime/state/heap/StateTable.java       |   5 +
 flink-tests/pom.xml                                |   9 +
 .../state/HeapSavepointStateBackendSwitchTest.java |   4 +-
 10 files changed, 753 insertions(+), 112 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java


[flink] 03/03: [FLINK-21206] Write savepoints in unified format from HeapStateBackend

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc995d3af957941e0e16e56e6830ef97acbadfd1
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 27 12:34:27 2021 +0100

    [FLINK-21206] Write savepoints in unified format from HeapStateBackend
    
    This closes #14809
---
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  19 ++-
 .../state/heap/HeapKeyedStateBackendBuilder.java   |  16 +-
 .../runtime/state/heap/HeapSavepointStrategy.java  |  95 +++++++++++
 .../runtime/state/heap/HeapSnapshotResources.java  | 190 +++++++++++++++++++++
 .../runtime/state/heap/HeapSnapshotStrategy.java   | 116 ++-----------
 flink-tests/pom.xml                                |   9 +
 .../state/HeapSavepointStateBackendSwitchTest.java |   4 +-
 7 files changed, 338 insertions(+), 111 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index c81bb68..0b42a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -106,7 +106,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
      * The snapshot strategy for this backend. This determines, e.g., if snapshots are synchronous
      * or asynchronous.
      */
-    private final SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner;
+    private final SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner;
+
+    private final SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner;
 
     private final StateTableFactory<K> stateTableFactory;
 
@@ -125,7 +127,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             LocalRecoveryConfig localRecoveryConfig,
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
-            SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
+            SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner,
+            SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner,
             StateTableFactory<K> stateTableFactory,
             InternalKeyContext<K> keyContext) {
         super(
@@ -141,7 +144,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
         this.registeredPQStates = registeredPQStates;
         this.localRecoveryConfig = localRecoveryConfig;
         this.priorityQueueSetFactory = priorityQueueSetFactory;
-        this.snapshotStrategyRunner = snapshotStrategyRunner;
+        this.checkpointStrategyRunner = checkpointStrategyRunner;
+        this.savepointStrategyRunner = savepointStrategyRunner;
         this.stateTableFactory = stateTableFactory;
         LOG.info("Initializing heap keyed state backend with stream factory.");
     }
@@ -356,8 +360,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
 
-        return snapshotStrategyRunner.snapshot(
-                checkpointId, timestamp, streamFactory, checkpointOptions);
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            return savepointStrategyRunner.snapshot(
+                    checkpointId, timestamp, streamFactory, checkpointOptions);
+        } else {
+            return checkpointStrategyRunner.snapshot(
+                    checkpointId, timestamp, streamFactory, checkpointOptions);
+        }
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 6519461..3991ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -96,6 +96,14 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
         CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
         HeapSnapshotStrategy<K> snapshotStrategy =
                 initSnapshotStrategy(registeredKVStates, registeredPQStates);
+        HeapSavepointStrategy<K> savepointStrategy =
+                new HeapSavepointStrategy<>(
+                        registeredKVStates,
+                        registeredPQStates,
+                        keyGroupCompressionDecorator,
+                        keyGroupRange,
+                        keySerializerProvider,
+                        numberOfKeyGroups);
         InternalKeyContext<K> keyContext =
                 new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
 
@@ -124,6 +132,11 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
                         snapshotStrategy,
                         cancelStreamRegistryForBackend,
                         asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
+                new SnapshotStrategyRunner<>(
+                        "Heap backend savepoint",
+                        savepointStrategy,
+                        cancelStreamRegistryForBackend,
+                        asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
                 stateTableFactory,
                 keyContext);
     }
@@ -187,6 +200,7 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
                 keyGroupCompressionDecorator,
                 localRecoveryConfig,
                 keyGroupRange,
-                keySerializerProvider);
+                keySerializerProvider,
+                numberOfKeyGroups);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
new file mode 100644
index 0000000..ed70ed7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+
+/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
+class HeapSavepointStrategy<K>
+        implements SnapshotStrategy<KeyedStateHandle, FullSnapshotResources<K>> {
+
+    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
+    private final StreamCompressionDecorator keyGroupCompressionDecorator;
+    private final KeyGroupRange keyGroupRange;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final int totalKeyGroups;
+
+    HeapSavepointStrategy(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            StreamCompressionDecorator keyGroupCompressionDecorator,
+            KeyGroupRange keyGroupRange,
+            StateSerializerProvider<K> keySerializerProvider,
+            int totalKeyGroups) {
+        this.registeredKVStates = registeredKVStates;
+        this.registeredPQStates = registeredPQStates;
+        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializerProvider = keySerializerProvider;
+        this.totalKeyGroups = totalKeyGroups;
+    }
+
+    @Override
+    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
+        return HeapSnapshotResources.create(
+                registeredKVStates,
+                registeredPQStates,
+                keyGroupCompressionDecorator,
+                keyGroupRange,
+                getKeySerializer(),
+                totalKeyGroups);
+    }
+
+    @Override
+    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+            FullSnapshotResources<K> syncPartResource,
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions) {
+
+        assert checkpointOptions.getCheckpointType().isSavepoint();
+        return new FullSnapshotAsyncWriter<>(
+                checkpointOptions.getCheckpointType(),
+                () ->
+                        CheckpointStreamWithResultProvider.createSimpleStream(
+                                CheckpointedStateScope.EXCLUSIVE, streamFactory),
+                syncPartResource);
+    }
+
+    public TypeSerializer<K> getKeySerializer() {
+        return keySerializerProvider.currentSchemaSerializer();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
new file mode 100644
index 0000000..aa2c0af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotResources.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A set of resources required to take a checkpoint or savepoint from a {@link
+ * HeapKeyedStateBackend}.
+ */
+@Internal
+final class HeapSnapshotResources<K> implements FullSnapshotResources<K> {
+    private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
+    private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
+    private final StreamCompressionDecorator streamCompressionDecorator;
+    private final Map<StateUID, Integer> stateNamesToId;
+    private final KeyGroupRange keyGroupRange;
+    private final TypeSerializer<K> keySerializer;
+    private final int totalKeyGroups;
+
+    private HeapSnapshotResources(
+            List<StateMetaInfoSnapshot> metaInfoSnapshots,
+            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+            StreamCompressionDecorator streamCompressionDecorator,
+            Map<StateUID, Integer> stateNamesToId,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            int totalKeyGroups) {
+        this.metaInfoSnapshots = metaInfoSnapshots;
+        this.cowStateStableSnapshots = cowStateStableSnapshots;
+        this.streamCompressionDecorator = streamCompressionDecorator;
+        this.stateNamesToId = stateNamesToId;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializer = keySerializer;
+        this.totalKeyGroups = totalKeyGroups;
+    }
+
+    public static <K> HeapSnapshotResources<K> create(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
+            StreamCompressionDecorator streamCompressionDecorator,
+            KeyGroupRange keyGroupRange,
+            TypeSerializer<K> keySerializer,
+            int totalKeyGroups) {
+
+        if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
+            return new HeapSnapshotResources<>(
+                    Collections.emptyList(),
+                    Collections.emptyMap(),
+                    streamCompressionDecorator,
+                    Collections.emptyMap(),
+                    keyGroupRange,
+                    keySerializer,
+                    totalKeyGroups);
+        }
+
+        int numStates = registeredKVStates.size() + registeredPQStates.size();
+
+        Preconditions.checkState(
+                numStates <= Short.MAX_VALUE,
+                "Too many states: "
+                        + numStates
+                        + ". Currently at most "
+                        + Short.MAX_VALUE
+                        + " states are supported");
+
+        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
+        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
+        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
+                registeredKVStates,
+                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+
+        processSnapshotMetaInfoForAllStates(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                stateNamesToId,
+                registeredPQStates,
+                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+
+        return new HeapSnapshotResources<>(
+                metaInfoSnapshots,
+                cowStateStableSnapshots,
+                streamCompressionDecorator,
+                stateNamesToId,
+                keyGroupRange,
+                keySerializer,
+                totalKeyGroups);
+    }
+
+    private static void processSnapshotMetaInfoForAllStates(
+            List<StateMetaInfoSnapshot> metaInfoSnapshots,
+            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
+            Map<StateUID, Integer> stateNamesToId,
+            Map<String, ? extends StateSnapshotRestore> registeredStates,
+            StateMetaInfoSnapshot.BackendStateType stateType) {
+
+        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
+                registeredStates.entrySet()) {
+            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
+            stateNamesToId.put(stateUid, stateNamesToId.size());
+            StateSnapshotRestore state = kvState.getValue();
+            if (null != state) {
+                final StateSnapshot stateSnapshot = state.stateSnapshot();
+                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+                cowStateStableSnapshots.put(stateUid, stateSnapshot);
+            }
+        }
+    }
+
+    @Override
+    public void release() {
+        for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
+            stateSnapshot.release();
+        }
+    }
+
+    public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
+        return metaInfoSnapshots;
+    }
+
+    @Override
+    public KeyValueStateIterator createKVStateIterator() throws IOException {
+        return new HeapKeyValueStateIterator(
+                keyGroupRange,
+                keySerializer,
+                totalKeyGroups,
+                stateNamesToId,
+                cowStateStableSnapshots);
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyGroupRange;
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keySerializer;
+    }
+
+    @Override
+    public StreamCompressionDecorator getStreamCompressionDecorator() {
+        return streamCompressionDecorator;
+    }
+
+    public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
+        return cowStateStableSnapshots;
+    }
+
+    public Map<StateUID, Integer> getStateNamesToId() {
+        return stateNamesToId;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index fe48c90..78b96e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -30,26 +30,20 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -60,7 +54,7 @@ import static org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.
 
 /** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
 class HeapSnapshotStrategy<K>
-        implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotStrategy.HeapSnapshotResources> {
+        implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
 
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
     private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
@@ -68,6 +62,7 @@ class HeapSnapshotStrategy<K>
     private final LocalRecoveryConfig localRecoveryConfig;
     private final KeyGroupRange keyGroupRange;
     private final StateSerializerProvider<K> keySerializerProvider;
+    private final int totalKeyGroups;
 
     HeapSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
@@ -75,58 +70,31 @@ class HeapSnapshotStrategy<K>
             StreamCompressionDecorator keyGroupCompressionDecorator,
             LocalRecoveryConfig localRecoveryConfig,
             KeyGroupRange keyGroupRange,
-            StateSerializerProvider<K> keySerializerProvider) {
+            StateSerializerProvider<K> keySerializerProvider,
+            int totalKeyGroups) {
         this.registeredKVStates = registeredKVStates;
         this.registeredPQStates = registeredPQStates;
         this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
         this.localRecoveryConfig = localRecoveryConfig;
         this.keyGroupRange = keyGroupRange;
         this.keySerializerProvider = keySerializerProvider;
+        this.totalKeyGroups = totalKeyGroups;
     }
 
     @Override
-    public HeapSnapshotResources syncPrepareResources(long checkpointId) {
-
-        if (!hasRegisteredState()) {
-            return new HeapSnapshotResources(
-                    Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
-        }
-
-        int numStates = registeredKVStates.size() + registeredPQStates.size();
-
-        Preconditions.checkState(
-                numStates <= Short.MAX_VALUE,
-                "Too many states: "
-                        + numStates
-                        + ". Currently at most "
-                        + Short.MAX_VALUE
-                        + " states are supported");
-
-        final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
-        final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
-        final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
+    public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
+        return HeapSnapshotResources.create(
                 registeredKVStates,
-                StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
-
-        processSnapshotMetaInfoForAllStates(
-                metaInfoSnapshots,
-                cowStateStableSnapshots,
-                stateNamesToId,
                 registeredPQStates,
-                StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
-
-        return new HeapSnapshotResources(
-                metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId);
+                keyGroupCompressionDecorator,
+                keyGroupRange,
+                getKeySerializer(),
+                totalKeyGroups);
     }
 
     @Override
     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
-            HeapSnapshotResources syncPartResource,
+            HeapSnapshotResources<K> syncPartResource,
             long checkpointId,
             long timestamp,
             @Nonnull CheckpointStreamFactory streamFactory,
@@ -142,7 +110,7 @@ class HeapSnapshotStrategy<K>
                         // TODO: this code assumes that writing a serializer is threadsafe, we
                         // should support to
                         // get a serialized form already at state registration time in the future
-                        getKeySerializer(),
+                        syncPartResource.getKeySerializer(),
                         metaInfoSnapshots,
                         !Objects.equals(
                                 UncompressedStreamCompressionDecorator.INSTANCE,
@@ -214,65 +182,7 @@ class HeapSnapshotStrategy<K>
         };
     }
 
-    private void processSnapshotMetaInfoForAllStates(
-            List<StateMetaInfoSnapshot> metaInfoSnapshots,
-            Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-            Map<StateUID, Integer> stateNamesToId,
-            Map<String, ? extends StateSnapshotRestore> registeredStates,
-            StateMetaInfoSnapshot.BackendStateType stateType) {
-
-        for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
-                registeredStates.entrySet()) {
-            final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
-            stateNamesToId.put(stateUid, stateNamesToId.size());
-            StateSnapshotRestore state = kvState.getValue();
-            if (null != state) {
-                final StateSnapshot stateSnapshot = state.stateSnapshot();
-                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
-                cowStateStableSnapshots.put(stateUid, stateSnapshot);
-            }
-        }
-    }
-
-    private boolean hasRegisteredState() {
-        return !(registeredKVStates.isEmpty() && registeredPQStates.isEmpty());
-    }
-
     public TypeSerializer<K> getKeySerializer() {
         return keySerializerProvider.currentSchemaSerializer();
     }
-
-    static class HeapSnapshotResources implements SnapshotResources {
-        private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
-        private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
-        private final Map<StateUID, Integer> stateNamesToId;
-
-        HeapSnapshotResources(
-                @Nonnull List<StateMetaInfoSnapshot> metaInfoSnapshots,
-                @Nonnull Map<StateUID, StateSnapshot> cowStateStableSnapshots,
-                @Nonnull Map<StateUID, Integer> stateNamesToId) {
-            this.metaInfoSnapshots = metaInfoSnapshots;
-            this.cowStateStableSnapshots = cowStateStableSnapshots;
-            this.stateNamesToId = stateNamesToId;
-        }
-
-        @Override
-        public void release() {
-            for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
-                stateSnapshot.release();
-            }
-        }
-
-        public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
-            return metaInfoSnapshots;
-        }
-
-        public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
-            return cowStateStableSnapshots;
-        }
-
-        public Map<StateUID, Integer> getStateNamesToId() {
-            return stateNamesToId;
-        }
-    }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 7031344..fb30a1d 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,6 +231,15 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>com.github.oshi</groupId>
 			<artifactId>oshi-core</artifactId>
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
index d5708a6..5138e03 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/HeapSavepointStateBackendSwitchTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.test.state.BackendSwitchSpecs.BackendSwitchSpec;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 
 /** Tests for switching a HEAP state backend to a different one. */
 @RunWith(Parameterized.class)
@@ -35,6 +35,6 @@ public class HeapSavepointStateBackendSwitchTest extends SavepointStateBackendSw
 
     @Parameterized.Parameters
     public static Collection<BackendSwitchSpec> targetBackends() {
-        return Collections.singletonList(BackendSwitchSpecs.HEAP);
+        return Arrays.asList(BackendSwitchSpecs.HEAP, BackendSwitchSpecs.ROCKS);
     }
 }


[flink] 02/03: [FLINK-21206] Implement HeapKeyValueStateIterator

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d67ace2395e21188ad9663e36a1caaf357867897
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jan 25 09:05:49 2021 +0100

    [FLINK-21206] Implement HeapKeyValueStateIterator
---
 .../flink/runtime/state/KeyValueStateIterator.java |   4 +-
 .../state/heap/HeapKeyValueStateIterator.java      | 407 +++++++++++++++++++++
 2 files changed, 410 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
index 28a3266..cf9cf16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.IOException;
+
 /**
  * Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
  * snapshotting.
@@ -31,7 +33,7 @@ public interface KeyValueStateIterator extends AutoCloseable {
      * Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
      * can only change after calling {@link #next()}.
      */
-    void next();
+    void next() throws IOException;
 
     /** Returns the key-group for the current key. */
     int keyGroup();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
new file mode 100644
index 0000000..f5f11e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IterableStateSnapshot;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over Heap backend snapshot
+ * resources.
+ */
+@Internal
+@NotThreadSafe
+public final class HeapKeyValueStateIterator implements KeyValueStateIterator {
+
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+    private final Map<StateUID, Integer> stateNamesToId;
+    private final Map<StateUID, StateSnapshot> stateStableSnapshots;
+    private final int keyGroupPrefixBytes;
+
+    private boolean isValid;
+    private boolean newKeyGroup;
+    private boolean newKVState;
+    private byte[] currentKey;
+    private byte[] currentValue;
+
+    /** Iterator over the key groups of the corresponding key group range. */
+    private final Iterator<Integer> keyGroupIterator;
+    /** The current value of the keyGroupIterator. */
+    private int currentKeyGroup;
+
+    /** Iterator over all states present in the snapshots. */
+    private Iterator<StateUID> statesIterator;
+    /** The current value of the statesIterator. */
+    private StateUID currentState;
+    /**
+     * An iterator over the values of the current state. It can be one of three:
+     *
+     * <ul>
+     *   <li>{@link QueueIterator} for iterating over entries in a priority queue
+     *   <li>{@link StateTableIterator} for iterating over entries in a StateTable
+     *   <li>{@link MapStateIterator} for iterating over entries in a user map, this one falls back
+     *       to the upper one automatically if exhausted
+     * </ul>
+     */
+    private SingleStateIterator currentStateIterator;
+    /** Helpers for serializing state into the unified format. */
+    private final DataOutputSerializer valueOut = new DataOutputSerializer(64);
+
+    private final ListDelimitedSerializer listDelimitedSerializer = new ListDelimitedSerializer();
+    private final SerializedCompositeKeyBuilder<Object> compositeKeyBuilder;
+
+    public HeapKeyValueStateIterator(
+            @Nonnull final KeyGroupRange keyGroupRange,
+            @Nonnull final TypeSerializer<?> keySerializer,
+            @Nonnegative final int totalKeyGroups,
+            @Nonnull final Map<StateUID, Integer> stateNamesToId,
+            @Nonnull final Map<StateUID, StateSnapshot> stateSnapshots)
+            throws IOException {
+        checkNotNull(keyGroupRange);
+        checkNotNull(keySerializer);
+        this.stateNamesToId = checkNotNull(stateNamesToId);
+        this.stateStableSnapshots = checkNotNull(stateSnapshots);
+
+        this.statesIterator = stateSnapshots.keySet().iterator();
+        this.keyGroupIterator = keyGroupRange.iterator();
+
+        this.keyGroupPrefixBytes =
+                CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
+        this.compositeKeyBuilder =
+                new SerializedCompositeKeyBuilder<>(
+                        castToType(keySerializer), keyGroupPrefixBytes, 32);
+
+        if (!keyGroupIterator.hasNext() || !statesIterator.hasNext()) {
+            // stop early, no key groups or states
+            isValid = false;
+        } else {
+            currentKeyGroup = keyGroupIterator.next();
+            next();
+            this.newKeyGroup = true;
+        }
+    }
+
+    @Override
+    public boolean isValid() {
+        return isValid;
+    }
+
+    @Override
+    public boolean isNewKeyValueState() {
+        return this.newKVState;
+    }
+
+    @Override
+    public boolean isNewKeyGroup() {
+        return this.newKeyGroup;
+    }
+
+    @Override
+    public int keyGroup() {
+        return currentKeyGroup;
+    }
+
+    @Override
+    public int kvStateId() {
+        return stateNamesToId.get(currentState);
+    }
+
+    @Override
+    public void next() throws IOException {
+        boolean hasStateEntry = false;
+
+        this.newKVState = false;
+        this.newKeyGroup = false;
+
+        do {
+            if (currentState == null) {
+                boolean hasNextState = moveToNextState();
+                if (!hasNextState) {
+                    break;
+                }
+            }
+
+            hasStateEntry = currentStateIterator != null && currentStateIterator.hasNext();
+            if (!hasStateEntry) {
+                this.currentState = null;
+            }
+        } while (!hasStateEntry);
+
+        if (hasStateEntry) {
+            isValid = true;
+            currentStateIterator.writeOutNext();
+        } else {
+            isValid = false;
+        }
+    }
+
+    private boolean moveToNextState() throws IOException {
+        if (statesIterator.hasNext()) {
+            this.currentState = statesIterator.next();
+            this.newKVState = true;
+        } else if (keyGroupIterator.hasNext()) {
+            this.currentKeyGroup = keyGroupIterator.next();
+            resetStates();
+            this.newKeyGroup = true;
+            this.newKVState = true;
+        } else {
+            return false;
+        }
+
+        StateSnapshot stateSnapshot = this.stateStableSnapshots.get(currentState);
+        setCurrentStateIterator(stateSnapshot);
+
+        // set to a valid entry
+        return true;
+    }
+
+    private void resetStates() {
+        this.statesIterator = stateStableSnapshots.keySet().iterator();
+        this.currentState = statesIterator.next();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void setCurrentStateIterator(StateSnapshot stateSnapshot) throws IOException {
+        if (stateSnapshot instanceof IterableStateSnapshot) {
+            RegisteredKeyValueStateBackendMetaInfo<Object, Object> metaInfo =
+                    new RegisteredKeyValueStateBackendMetaInfo<>(
+                            stateSnapshot.getMetaInfoSnapshot());
+            Iterator<? extends StateEntry<?, ?, ?>> snapshotIterator =
+                    ((IterableStateSnapshot<?, ?, ?>) stateSnapshot).getIterator(currentKeyGroup);
+            this.currentStateIterator = new StateTableIterator(snapshotIterator, metaInfo);
+        } else if (stateSnapshot instanceof HeapPriorityQueueStateSnapshot) {
+            Iterator<Object> snapshotIterator =
+                    ((HeapPriorityQueueStateSnapshot<Object>) stateSnapshot)
+                            .getIteratorForKeyGroup(currentKeyGroup);
+            RegisteredPriorityQueueStateBackendMetaInfo<Object> metaInfo =
+                    new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                            stateSnapshot.getMetaInfoSnapshot());
+            this.currentStateIterator = new QueueIterator<>(snapshotIterator, metaInfo);
+        } else {
+            throw new IllegalStateException("Unknown snapshot type: " + stateSnapshot);
+        }
+    }
+
+    /** A common interface for writing out a single entry in a state. */
+    private interface SingleStateIterator {
+
+        boolean hasNext();
+
+        void writeOutNext() throws IOException;
+    }
+
+    private final class StateTableIterator implements SingleStateIterator {
+
+        private final Iterator<? extends StateEntry<?, ?, ?>> entriesIterator;
+        private final RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot;
+
+        private StateTableIterator(
+                Iterator<? extends StateEntry<?, ?, ?>> entriesIterator,
+                RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot) {
+            this.entriesIterator = entriesIterator;
+            this.stateSnapshot = stateSnapshot;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return entriesIterator.hasNext();
+        }
+
+        @Override
+        public void writeOutNext() throws IOException {
+            StateEntry<?, ?, ?> currentEntry = entriesIterator.next();
+            valueOut.clear();
+            compositeKeyBuilder.setKeyAndKeyGroup(currentEntry.getKey(), keyGroup());
+            compositeKeyBuilder.setNamespace(
+                    currentEntry.getNamespace(),
+                    castToType(stateSnapshot.getNamespaceSerializer()));
+            TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
+            switch (stateSnapshot.getStateType()) {
+                case AGGREGATING:
+                case REDUCING:
+                case FOLDING:
+                case VALUE:
+                    writeOutValue(currentEntry, stateSerializer);
+                    break;
+                case LIST:
+                    writeOutList(currentEntry, stateSerializer);
+                    break;
+                case MAP:
+                    writeOutMap(currentEntry, stateSerializer);
+                    break;
+                default:
+                    throw new IllegalStateException("");
+            }
+        }
+
+        private void writeOutValue(
+                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
+                throws IOException {
+            currentKey = compositeKeyBuilder.build();
+            castToType(stateSerializer).serialize(currentEntry.getState(), valueOut);
+            currentValue = valueOut.getCopyOfBuffer();
+        }
+
+        @SuppressWarnings("unchecked")
+        private void writeOutList(
+                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
+                throws IOException {
+            ListSerializer<Object> listSerializer = (ListSerializer<Object>) stateSerializer;
+            currentKey = compositeKeyBuilder.build();
+            currentValue =
+                    listDelimitedSerializer.serializeList(
+                            (List<Object>) currentEntry.getState(),
+                            listSerializer.getElementSerializer());
+        }
+
+        @SuppressWarnings("unchecked")
+        private void writeOutMap(
+                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
+                throws IOException {
+            MapSerializer<Object, Object> mapSerializer =
+                    (MapSerializer<Object, Object>) stateSerializer;
+            currentStateIterator =
+                    new MapStateIterator(
+                            (Map<Object, Object>) currentEntry.getState(),
+                            mapSerializer.getKeySerializer(),
+                            mapSerializer.getValueSerializer(),
+                            this);
+            currentStateIterator.writeOutNext();
+        }
+    }
+
+    private final class MapStateIterator implements SingleStateIterator {
+
+        private final Iterator<Map.Entry<Object, Object>> mapEntries;
+        private final TypeSerializer<Object> userKeySerializer;
+        private final TypeSerializer<Object> userValueSerializer;
+        private final StateTableIterator parentIterator;
+
+        private MapStateIterator(
+                Map<Object, Object> mapEntries,
+                TypeSerializer<Object> userKeySerializer,
+                TypeSerializer<Object> userValueSerializer,
+                StateTableIterator parentIterator) {
+            this.mapEntries = mapEntries.entrySet().iterator();
+            this.userKeySerializer = userKeySerializer;
+            this.userValueSerializer = userValueSerializer;
+            this.parentIterator = parentIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            // we should never end up here with an exhausted map iterator
+            // if an iterator is exhausted in the writeOutNext we switch back to
+            // the originating StateTableIterator
+            assert mapEntries.hasNext();
+            return true;
+        }
+
+        @Override
+        public void writeOutNext() throws IOException {
+            Map.Entry<Object, Object> entry = mapEntries.next();
+            valueOut.clear();
+            currentKey =
+                    compositeKeyBuilder.buildCompositeKeyUserKey(entry.getKey(), userKeySerializer);
+            Object userValue = entry.getValue();
+            valueOut.writeBoolean(userValue == null);
+            userValueSerializer.serialize(userValue, valueOut);
+            currentValue = valueOut.getCopyOfBuffer();
+
+            if (!mapEntries.hasNext()) {
+                currentStateIterator = parentIterator;
+            }
+        }
+    }
+
+    private final class QueueIterator<T> implements SingleStateIterator {
+        private final Iterator<T> elementsForKeyGroup;
+        private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
+        private final DataOutputSerializer keyOut = new DataOutputSerializer(128);
+        private final int afterKeyMark;
+
+        public QueueIterator(
+                Iterator<T> elementsForKeyGroup,
+                RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo)
+                throws IOException {
+            this.elementsForKeyGroup = elementsForKeyGroup;
+            this.metaInfo = metaInfo;
+            CompositeKeySerializationUtils.writeKeyGroup(keyGroup(), keyGroupPrefixBytes, keyOut);
+            afterKeyMark = keyOut.length();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return elementsForKeyGroup.hasNext();
+        }
+
+        @Override
+        public void writeOutNext() throws IOException {
+            currentValue = EMPTY_BYTE_ARRAY;
+            keyOut.setPosition(afterKeyMark);
+            T next = elementsForKeyGroup.next();
+            metaInfo.getElementSerializer().serialize(next, keyOut);
+            currentKey = keyOut.getCopyOfBuffer();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nonnull
+    private static <T> TypeSerializer<T> castToType(@Nonnull TypeSerializer<?> serializer) {
+        return (TypeSerializer<T>) serializer;
+    }
+
+    @Override
+    public byte[] key() {
+        return currentKey;
+    }
+
+    @Override
+    public byte[] value() {
+        return currentValue;
+    }
+
+    @Override
+    public void close() {}
+}


[flink] 01/03: [hotfix] Enforce StateTable return IterableStateSnapshot

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6c80390d7af57c5d6fd0a85ae6ab425cc098f90
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jan 29 10:23:26 2021 +0100

    [hotfix] Enforce StateTable return IterableStateSnapshot
---
 .../main/java/org/apache/flink/runtime/state/heap/StateTable.java    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 57d5d01..c64184d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.IterableStateSnapshot;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateEntry;
@@ -99,6 +100,10 @@ public abstract class StateTable<K, N, S>
 
     protected abstract StateMap<K, N, S> createStateMap();
 
+    @Override
+    @Nonnull
+    public abstract IterableStateSnapshot<K, N, S> stateSnapshot();
+
     // Main interface methods of StateTable -------------------------------------------------------
 
     /**