You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/10 15:46:07 UTC

[flink] 01/02: Revert "[FLINK-21206] Implement HeapKeyValueStateIterator"

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2445a3faf1493785b481d9b9b1fe8c8ea81da0ec
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Feb 10 16:30:48 2021 +0100

    Revert "[FLINK-21206] Implement HeapKeyValueStateIterator"
    
    This reverts commit d67ace2395e21188ad9663e36a1caaf357867897.
---
 .../flink/runtime/state/KeyValueStateIterator.java |   4 +-
 .../state/heap/HeapKeyValueStateIterator.java      | 407 ---------------------
 2 files changed, 1 insertion(+), 410 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
index cf9cf16..28a3266 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyValueStateIterator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.IOException;
-
 /**
  * Iterator that over all key-value state entries in a {@link KeyedStateBackend}. For use during
  * snapshotting.
@@ -33,7 +31,7 @@ public interface KeyValueStateIterator extends AutoCloseable {
      * Advances the iterator. Should only be called if {@link #isValid()} returned true. Valid flag
      * can only change after calling {@link #next()}.
      */
-    void next() throws IOException;
+    void next();
 
     /** Returns the key-group for the current key. */
     int keyGroup();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
deleted file mode 100644
index f5f11e8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
-import org.apache.flink.runtime.state.IterableStateSnapshot;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyValueStateIterator;
-import org.apache.flink.runtime.state.ListDelimitedSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
-import org.apache.flink.runtime.state.StateEntry;
-import org.apache.flink.runtime.state.StateSnapshot;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over Heap backend snapshot
- * resources.
- */
-@Internal
-@NotThreadSafe
-public final class HeapKeyValueStateIterator implements KeyValueStateIterator {
-
-    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-
-    private final Map<StateUID, Integer> stateNamesToId;
-    private final Map<StateUID, StateSnapshot> stateStableSnapshots;
-    private final int keyGroupPrefixBytes;
-
-    private boolean isValid;
-    private boolean newKeyGroup;
-    private boolean newKVState;
-    private byte[] currentKey;
-    private byte[] currentValue;
-
-    /** Iterator over the key groups of the corresponding key group range. */
-    private final Iterator<Integer> keyGroupIterator;
-    /** The current value of the keyGroupIterator. */
-    private int currentKeyGroup;
-
-    /** Iterator over all states present in the snapshots. */
-    private Iterator<StateUID> statesIterator;
-    /** The current value of the statesIterator. */
-    private StateUID currentState;
-    /**
-     * An iterator over the values of the current state. It can be one of three:
-     *
-     * <ul>
-     *   <li>{@link QueueIterator} for iterating over entries in a priority queue
-     *   <li>{@link StateTableIterator} for iterating over entries in a StateTable
-     *   <li>{@link MapStateIterator} for iterating over entries in a user map, this one falls back
-     *       to the upper one automatically if exhausted
-     * </ul>
-     */
-    private SingleStateIterator currentStateIterator;
-    /** Helpers for serializing state into the unified format. */
-    private final DataOutputSerializer valueOut = new DataOutputSerializer(64);
-
-    private final ListDelimitedSerializer listDelimitedSerializer = new ListDelimitedSerializer();
-    private final SerializedCompositeKeyBuilder<Object> compositeKeyBuilder;
-
-    public HeapKeyValueStateIterator(
-            @Nonnull final KeyGroupRange keyGroupRange,
-            @Nonnull final TypeSerializer<?> keySerializer,
-            @Nonnegative final int totalKeyGroups,
-            @Nonnull final Map<StateUID, Integer> stateNamesToId,
-            @Nonnull final Map<StateUID, StateSnapshot> stateSnapshots)
-            throws IOException {
-        checkNotNull(keyGroupRange);
-        checkNotNull(keySerializer);
-        this.stateNamesToId = checkNotNull(stateNamesToId);
-        this.stateStableSnapshots = checkNotNull(stateSnapshots);
-
-        this.statesIterator = stateSnapshots.keySet().iterator();
-        this.keyGroupIterator = keyGroupRange.iterator();
-
-        this.keyGroupPrefixBytes =
-                CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
-        this.compositeKeyBuilder =
-                new SerializedCompositeKeyBuilder<>(
-                        castToType(keySerializer), keyGroupPrefixBytes, 32);
-
-        if (!keyGroupIterator.hasNext() || !statesIterator.hasNext()) {
-            // stop early, no key groups or states
-            isValid = false;
-        } else {
-            currentKeyGroup = keyGroupIterator.next();
-            next();
-            this.newKeyGroup = true;
-        }
-    }
-
-    @Override
-    public boolean isValid() {
-        return isValid;
-    }
-
-    @Override
-    public boolean isNewKeyValueState() {
-        return this.newKVState;
-    }
-
-    @Override
-    public boolean isNewKeyGroup() {
-        return this.newKeyGroup;
-    }
-
-    @Override
-    public int keyGroup() {
-        return currentKeyGroup;
-    }
-
-    @Override
-    public int kvStateId() {
-        return stateNamesToId.get(currentState);
-    }
-
-    @Override
-    public void next() throws IOException {
-        boolean hasStateEntry = false;
-
-        this.newKVState = false;
-        this.newKeyGroup = false;
-
-        do {
-            if (currentState == null) {
-                boolean hasNextState = moveToNextState();
-                if (!hasNextState) {
-                    break;
-                }
-            }
-
-            hasStateEntry = currentStateIterator != null && currentStateIterator.hasNext();
-            if (!hasStateEntry) {
-                this.currentState = null;
-            }
-        } while (!hasStateEntry);
-
-        if (hasStateEntry) {
-            isValid = true;
-            currentStateIterator.writeOutNext();
-        } else {
-            isValid = false;
-        }
-    }
-
-    private boolean moveToNextState() throws IOException {
-        if (statesIterator.hasNext()) {
-            this.currentState = statesIterator.next();
-            this.newKVState = true;
-        } else if (keyGroupIterator.hasNext()) {
-            this.currentKeyGroup = keyGroupIterator.next();
-            resetStates();
-            this.newKeyGroup = true;
-            this.newKVState = true;
-        } else {
-            return false;
-        }
-
-        StateSnapshot stateSnapshot = this.stateStableSnapshots.get(currentState);
-        setCurrentStateIterator(stateSnapshot);
-
-        // set to a valid entry
-        return true;
-    }
-
-    private void resetStates() {
-        this.statesIterator = stateStableSnapshots.keySet().iterator();
-        this.currentState = statesIterator.next();
-    }
-
-    @SuppressWarnings("unchecked")
-    private void setCurrentStateIterator(StateSnapshot stateSnapshot) throws IOException {
-        if (stateSnapshot instanceof IterableStateSnapshot) {
-            RegisteredKeyValueStateBackendMetaInfo<Object, Object> metaInfo =
-                    new RegisteredKeyValueStateBackendMetaInfo<>(
-                            stateSnapshot.getMetaInfoSnapshot());
-            Iterator<? extends StateEntry<?, ?, ?>> snapshotIterator =
-                    ((IterableStateSnapshot<?, ?, ?>) stateSnapshot).getIterator(currentKeyGroup);
-            this.currentStateIterator = new StateTableIterator(snapshotIterator, metaInfo);
-        } else if (stateSnapshot instanceof HeapPriorityQueueStateSnapshot) {
-            Iterator<Object> snapshotIterator =
-                    ((HeapPriorityQueueStateSnapshot<Object>) stateSnapshot)
-                            .getIteratorForKeyGroup(currentKeyGroup);
-            RegisteredPriorityQueueStateBackendMetaInfo<Object> metaInfo =
-                    new RegisteredPriorityQueueStateBackendMetaInfo<>(
-                            stateSnapshot.getMetaInfoSnapshot());
-            this.currentStateIterator = new QueueIterator<>(snapshotIterator, metaInfo);
-        } else {
-            throw new IllegalStateException("Unknown snapshot type: " + stateSnapshot);
-        }
-    }
-
-    /** A common interface for writing out a single entry in a state. */
-    private interface SingleStateIterator {
-
-        boolean hasNext();
-
-        void writeOutNext() throws IOException;
-    }
-
-    private final class StateTableIterator implements SingleStateIterator {
-
-        private final Iterator<? extends StateEntry<?, ?, ?>> entriesIterator;
-        private final RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot;
-
-        private StateTableIterator(
-                Iterator<? extends StateEntry<?, ?, ?>> entriesIterator,
-                RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot) {
-            this.entriesIterator = entriesIterator;
-            this.stateSnapshot = stateSnapshot;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return entriesIterator.hasNext();
-        }
-
-        @Override
-        public void writeOutNext() throws IOException {
-            StateEntry<?, ?, ?> currentEntry = entriesIterator.next();
-            valueOut.clear();
-            compositeKeyBuilder.setKeyAndKeyGroup(currentEntry.getKey(), keyGroup());
-            compositeKeyBuilder.setNamespace(
-                    currentEntry.getNamespace(),
-                    castToType(stateSnapshot.getNamespaceSerializer()));
-            TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
-            switch (stateSnapshot.getStateType()) {
-                case AGGREGATING:
-                case REDUCING:
-                case FOLDING:
-                case VALUE:
-                    writeOutValue(currentEntry, stateSerializer);
-                    break;
-                case LIST:
-                    writeOutList(currentEntry, stateSerializer);
-                    break;
-                case MAP:
-                    writeOutMap(currentEntry, stateSerializer);
-                    break;
-                default:
-                    throw new IllegalStateException("");
-            }
-        }
-
-        private void writeOutValue(
-                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
-                throws IOException {
-            currentKey = compositeKeyBuilder.build();
-            castToType(stateSerializer).serialize(currentEntry.getState(), valueOut);
-            currentValue = valueOut.getCopyOfBuffer();
-        }
-
-        @SuppressWarnings("unchecked")
-        private void writeOutList(
-                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
-                throws IOException {
-            ListSerializer<Object> listSerializer = (ListSerializer<Object>) stateSerializer;
-            currentKey = compositeKeyBuilder.build();
-            currentValue =
-                    listDelimitedSerializer.serializeList(
-                            (List<Object>) currentEntry.getState(),
-                            listSerializer.getElementSerializer());
-        }
-
-        @SuppressWarnings("unchecked")
-        private void writeOutMap(
-                StateEntry<?, ?, ?> currentEntry, TypeSerializer<?> stateSerializer)
-                throws IOException {
-            MapSerializer<Object, Object> mapSerializer =
-                    (MapSerializer<Object, Object>) stateSerializer;
-            currentStateIterator =
-                    new MapStateIterator(
-                            (Map<Object, Object>) currentEntry.getState(),
-                            mapSerializer.getKeySerializer(),
-                            mapSerializer.getValueSerializer(),
-                            this);
-            currentStateIterator.writeOutNext();
-        }
-    }
-
-    private final class MapStateIterator implements SingleStateIterator {
-
-        private final Iterator<Map.Entry<Object, Object>> mapEntries;
-        private final TypeSerializer<Object> userKeySerializer;
-        private final TypeSerializer<Object> userValueSerializer;
-        private final StateTableIterator parentIterator;
-
-        private MapStateIterator(
-                Map<Object, Object> mapEntries,
-                TypeSerializer<Object> userKeySerializer,
-                TypeSerializer<Object> userValueSerializer,
-                StateTableIterator parentIterator) {
-            this.mapEntries = mapEntries.entrySet().iterator();
-            this.userKeySerializer = userKeySerializer;
-            this.userValueSerializer = userValueSerializer;
-            this.parentIterator = parentIterator;
-        }
-
-        @Override
-        public boolean hasNext() {
-            // we should never end up here with an exhausted map iterator
-            // if an iterator is exhausted in the writeOutNext we switch back to
-            // the originating StateTableIterator
-            assert mapEntries.hasNext();
-            return true;
-        }
-
-        @Override
-        public void writeOutNext() throws IOException {
-            Map.Entry<Object, Object> entry = mapEntries.next();
-            valueOut.clear();
-            currentKey =
-                    compositeKeyBuilder.buildCompositeKeyUserKey(entry.getKey(), userKeySerializer);
-            Object userValue = entry.getValue();
-            valueOut.writeBoolean(userValue == null);
-            userValueSerializer.serialize(userValue, valueOut);
-            currentValue = valueOut.getCopyOfBuffer();
-
-            if (!mapEntries.hasNext()) {
-                currentStateIterator = parentIterator;
-            }
-        }
-    }
-
-    private final class QueueIterator<T> implements SingleStateIterator {
-        private final Iterator<T> elementsForKeyGroup;
-        private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
-        private final DataOutputSerializer keyOut = new DataOutputSerializer(128);
-        private final int afterKeyMark;
-
-        public QueueIterator(
-                Iterator<T> elementsForKeyGroup,
-                RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo)
-                throws IOException {
-            this.elementsForKeyGroup = elementsForKeyGroup;
-            this.metaInfo = metaInfo;
-            CompositeKeySerializationUtils.writeKeyGroup(keyGroup(), keyGroupPrefixBytes, keyOut);
-            afterKeyMark = keyOut.length();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return elementsForKeyGroup.hasNext();
-        }
-
-        @Override
-        public void writeOutNext() throws IOException {
-            currentValue = EMPTY_BYTE_ARRAY;
-            keyOut.setPosition(afterKeyMark);
-            T next = elementsForKeyGroup.next();
-            metaInfo.getElementSerializer().serialize(next, keyOut);
-            currentKey = keyOut.getCopyOfBuffer();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Nonnull
-    private static <T> TypeSerializer<T> castToType(@Nonnull TypeSerializer<?> serializer) {
-        return (TypeSerializer<T>) serializer;
-    }
-
-    @Override
-    public byte[] key() {
-        return currentKey;
-    }
-
-    @Override
-    public byte[] value() {
-        return currentValue;
-    }
-
-    @Override
-    public void close() {}
-}