You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/10 15:46:07 UTC
[flink] 01/02: Revert "[FLINK-21206] Implement
HeapKeyValueStateIterator"
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2445a3faf1493785b481d9b9b1fe8c8ea81da0ec
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Feb 10 16:30:48 2021 +0100
Revert "[FLINK-21206] Implement HeapKeyValueStateIterator"
This reverts commit d67ace2395e21188ad9663e36a1caaf357867897.
---
.../flink/runtime/state/KeyValueStateIterator.java | 4 +-
.../state/heap/HeapKeyValueStateIterator.java | 407 ---------------------
2 files changed, 1 insertion(+), 410 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
index cf9cf16..28a3266 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.state;
-import java.io.IOException;
-
/**
* Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
* snapshotting.
@@ -33,7 +31,7 @@ public interface KeyValueStateIterator extends AutoCloseable {
* Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
* can only change after calling {@link #next()}.
*/
- void next() throws IOException;
+ void next();
/** Returns the key-group for the current key. */
int keyGroup();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
deleted file mode 100644
index f5f11e8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
-import org.apache.flink.runtime.state.IterableStateSnapshot;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyValueStateIterator;
-import org.apache.flink.runtime.state.ListDelimitedSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
-import org.apache.flink.runtime.state.StateEntry;
-import org.apache.flink.runtime.state.StateSnapshot;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over Heap backend snapshot
- * resources.
- */
-@Internal
-@NotThreadSafe
-public final class HeapKeyValueStateIterator implements KeyValueStateIterator {
-
- private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-
- private final Map<StateUID, Integer> stateNamesToId;
- private final Map<StateUID, StateSnapshot> stateStableSnapshots;
- private final int keyGroupPrefixBytes;
-
- private boolean isValid;
- private boolean newKeyGroup;
- private boolean newKVState;
- private byte[] currentKey;
- private byte[] currentValue;
-
- /** Iterator over the key groups of the corresponding key group range. */
- private final Iterator<Integer> keyGroupIterator;
- /** The current value of the keyGroupIterator. */
- private int currentKeyGroup;
-
- /** Iterator over all states present in the snapshots. */
- private Iterator<StateUID> statesIterator;
- /** The current value of the statesIterator. */
- private StateUID currentState;
- /**
- * An iterator over the values of the current state. It can be one of three:
- *
- * <ul>
- * <li>{@link QueueIterator} for iterating over entries in a priority queue
- * <li>{@link StateTableIterator} for iterating over entries in a StateTable
- * <li>{@link MapStateIterator} for iterating over entries in a user map, this one falls back
- * to the upper one automatically if exhausted
- * </ul>
- */
- private SingleStateIterator currentStateIterator;
- /** Helpers for serializing state into the unified format. */
- private final DataOutputSerializer valueOut = new DataOutputSerializer(64);
-
- private final ListDelimitedSerializer listDelimitedSerializer = new ListDelimitedSerializer();
- private final SerializedCompositeKeyBuilder<Object> compositeKeyBuilder;
-
- public HeapKeyValueStateIterator(
- @Nonnull final KeyGroupRange keyGroupRange,
- @Nonnull final TypeSerializer<?> keySerializer,
- @Nonnegative final int totalKeyGroups,
- @Nonnull final Map<StateUID, Integer> stateNamesToId,
- @Nonnull final Map<StateUID, StateSnapshot> stateSnapshots)
- throws IOException {
- checkNotNull(keyGroupRange);
- checkNotNull(keySerializer);
- this.stateNamesToId = checkNotNull(stateNamesToId);
- this.stateStableSnapshots = checkNotNull(stateSnapshots);
-
- this.statesIterator = stateSnapshots.keySet().iterator();
- this.keyGroupIterator = keyGroupRange.iterator();
-
- this.keyGroupPrefixBytes =
- CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
- this.compositeKeyBuilder =
- new SerializedCompositeKeyBuilder<>(
- castToType(keySerializer), keyGroupPrefixBytes, 32);
-
- if (!keyGroupIterator.hasNext() || !statesIterator.hasNext()) {
- // stop early, no key groups or states
- isValid = false;
- } else {
- currentKeyGroup = keyGroupIterator.next();
- next();
- this.newKeyGroup = true;
- }
- }
-
- @Override
- public boolean isValid() {
- return isValid;
- }
-
- @Override
- public boolean isNewKeyValueState() {
- return this.newKVState;
- }
-
- @Override
- public boolean isNewKeyGroup() {
- return this.newKeyGroup;
- }
-
- @Override
- public int keyGroup() {
- return currentKeyGroup;
- }
-
- @Override
- public int kvStateId() {
- return stateNamesToId.get(currentState);
- }
-
- @Override
- public void next() throws IOException {
- boolean hasStateEntry = false;
-
- this.newKVState = false;
- this.newKeyGroup = false;
-
- do {
- if (currentState == null) {
- boolean hasNextState = moveToNextState();
- if (!hasNextState) {
- break;
- }
- }
-
- hasStateEntry = currentStateIterator != null && currentStateIterator.hasNext();
- if (!hasStateEntry) {
- this.currentState = null;
- }
- } while (!hasStateEntry);
-
- if (hasStateEntry) {
- isValid = true;
- currentStateIterator.writeOutNext();
- } else {
- isValid = false;
- }
- }
-
- private boolean moveToNextState() throws IOException {
- if (statesIterator.hasNext()) {
- this.currentState = statesIterator.next();
- this.newKVState = true;
- } else if (keyGroupIterator.hasNext()) {
- this.currentKeyGroup = keyGroupIterator.next();
- resetStates();
- this.newKeyGroup = true;
- this.newKVState = true;
- } else {
- return false;
- }
-
- StateSnapshot stateSnapshot = this.stateStableSnapshots.get(currentState);
- setCurrentStateIterator(stateSnapshot);
-
- // set to a valid entry
- return true;
- }
-
- private void resetStates() {
- this.statesIterator = stateStableSnapshots.keySet().iterator();
- this.currentState = statesIterator.next();
- }
-
- @SuppressWarnings("unchecked")
- private void setCurrentStateIterator(StateSnapshot stateSnapshot) throws IOException {
- if (stateSnapshot instanceof IterableStateSnapshot) {
- RegisteredKeyValueStateBackendMetaInfo<Object, Object> metaInfo =
- new RegisteredKeyValueStateBackendMetaInfo<>(
- stateSnapshot.getMetaInfoSnapshot());
- Iterator<? extends StateEntry<?, ?, ?>> snapshotIterator =
- ((IterableStateSnapshot<?, ?, ?>) stateSnapshot).getIterator(currentKeyGroup);
- this.currentStateIterator = new StateTableIterator(snapshotIterator, metaInfo);
- } else if (stateSnapshot instanceof HeapPriorityQueueStateSnapshot) {
- Iterator<Object> snapshotIterator =
- ((HeapPriorityQueueStateSnapshot<Object>) stateSnapshot)
- .getIteratorForKeyGroup(currentKeyGroup);
- RegisteredPriorityQueueStateBackendMetaInfo<Object> metaInfo =
- new RegisteredPriorityQueueStateBackendMetaInfo<>(
- stateSnapshot.getMetaInfoSnapshot());
- this.currentStateIterator = new QueueIterator<>(snapshotIterator, metaInfo);
- } else {
- throw new IllegalStateException("Unknown snapshot type: " + stateSnapshot);
- }
- }
-
- /** A common interface for writing out a single entry in a state. */
- private interface SingleStateIterator {
-
- boolean hasNext();
-
- void writeOutNext() throws IOException;
- }
-
- private final class StateTableIterator implements SingleStateIterator {
-
- private final Iterator<? extends StateEntry<?, ?, ?>> entriesIterator;
- private final RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot;
-
- private StateTableIterator(
- Iterator<? extends StateEntry<?, ?, ?>> entriesIterator,
- RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot) {
- this.entriesIterator = entriesIterator;
- this.stateSnapshot = stateSnapshot;
- }
-
- @Override
- public boolean hasNext() {
- return entriesIterator.hasNext();
- }
-
- @Override
- public void writeOutNext() throws IOException {
- StateEntry<?, ?, ?> currentEntry = entriesIterator.next();
- valueOut.clear();
- compositeKeyBuilder.setKeyAndKeyGroup(currentEntry.getKey(), keyGroup());
- compositeKeyBuilder.setNamespace(
- currentEntry.getNamespace(),
- castToType(stateSnapshot.getNamespaceSerializer()));
- TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
- switch (stateSnapshot.getStateType()) {
- case AGGREGATING:
- case REDUCING:
- case FOLDING:
- case VALUE:
- writeOutValue(currentEntry, stateSerializer);
- break;
- case LIST:
- writeOutList(currentEntry, stateSerializer);
- break;
- case MAP:
- writeOutMap(currentEntry, stateSerializer);
- break;
- default:
- throw new IllegalStateException("");
- }
- }
-
- private void writeOutValue(
- StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
- throws IOException {
- currentKey = compositeKeyBuilder.build();
- castToType(stateSerializer).serialize(currentEntry.getState(), valueOut);
- currentValue = valueOut.getCopyOfBuffer();
- }
-
- @SuppressWarnings("unchecked")
- private void writeOutList(
- StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
- throws IOException {
- ListSerializer<Object> listSerializer = (ListSerializer<Object>) stateSerializer;
- currentKey = compositeKeyBuilder.build();
- currentValue =
- listDelimitedSerializer.serializeList(
- (List<Object>) currentEntry.getState(),
- listSerializer.getElementSerializer());
- }
-
- @SuppressWarnings("unchecked")
- private void writeOutMap(
- StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
- throws IOException {
- MapSerializer<Object, Object> mapSerializer =
- (MapSerializer<Object, Object>) stateSerializer;
- currentStateIterator =
- new MapStateIterator(
- (Map<Object, Object>) currentEntry.getState(),
- mapSerializer.getKeySerializer(),
- mapSerializer.getValueSerializer(),
- this);
- currentStateIterator.writeOutNext();
- }
- }
-
- private final class MapStateIterator implements SingleStateIterator {
-
- private final Iterator<Map.Entry<Object, Object>> mapEntries;
- private final TypeSerializer<Object> userKeySerializer;
- private final TypeSerializer<Object> userValueSerializer;
- private final StateTableIterator parentIterator;
-
- private MapStateIterator(
- Map<Object, Object> mapEntries,
- TypeSerializer<Object> userKeySerializer,
- TypeSerializer<Object> userValueSerializer,
- StateTableIterator parentIterator) {
- this.mapEntries = mapEntries.entrySet().iterator();
- this.userKeySerializer = userKeySerializer;
- this.userValueSerializer = userValueSerializer;
- this.parentIterator = parentIterator;
- }
-
- @Override
- public boolean hasNext() {
- // we should never end up here with an exhausted map iterator
- // if an iterator is exhausted in the writeOutNext we switch back to
- // the originating StateTableIterator
- assert mapEntries.hasNext();
- return true;
- }
-
- @Override
- public void writeOutNext() throws IOException {
- Map.Entry<Object, Object> entry = mapEntries.next();
- valueOut.clear();
- currentKey =
- compositeKeyBuilder.buildCompositeKeyUserKey(entry.getKey(), userKeySerializer);
- Object userValue = entry.getValue();
- valueOut.writeBoolean(userValue == null);
- userValueSerializer.serialize(userValue, valueOut);
- currentValue = valueOut.getCopyOfBuffer();
-
- if (!mapEntries.hasNext()) {
- currentStateIterator = parentIterator;
- }
- }
- }
-
- private final class QueueIterator<T> implements SingleStateIterator {
- private final Iterator<T> elementsForKeyGroup;
- private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
- private final DataOutputSerializer keyOut = new DataOutputSerializer(128);
- private final int afterKeyMark;
-
- public QueueIterator(
- Iterator<T> elementsForKeyGroup,
- RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo)
- throws IOException {
- this.elementsForKeyGroup = elementsForKeyGroup;
- this.metaInfo = metaInfo;
- CompositeKeySerializationUtils.writeKeyGroup(keyGroup(), keyGroupPrefixBytes, keyOut);
- afterKeyMark = keyOut.length();
- }
-
- @Override
- public boolean hasNext() {
- return elementsForKeyGroup.hasNext();
- }
-
- @Override
- public void writeOutNext() throws IOException {
- currentValue = EMPTY_BYTE_ARRAY;
- keyOut.setPosition(afterKeyMark);
- T next = elementsForKeyGroup.next();
- metaInfo.getElementSerializer().serialize(next, keyOut);
- currentKey = keyOut.getCopyOfBuffer();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Nonnull
- private static <T> TypeSerializer<T> castToType(@Nonnull TypeSerializer<?> serializer) {
- return (TypeSerializer<T>) serializer;
- }
-
- @Override
- public byte[] key() {
- return currentKey;
- }
-
- @Override
- public byte[] value() {
- return currentValue;
- }
-
- @Override
- public void close() {}
-}