You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/10 18:10:24 UTC
[flink] 01/02: [FLINK-10041][state] Extract iterators from
RocksDBKeyedStateBackend (inner or static inner classes) into full classes
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d67afbb84736ee8c28ec1bd69eef8ddff4754f0
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Aug 6 11:21:58 2018 +0200
[FLINK-10041][state] Extract iterators from RocksDBKeyedStateBackend (inner or static inner classes) into full classes
This closes #6501.
---
.../state/RocksDBKeySerializationUtils.java | 2 +-
.../streaming/state/RocksDBKeyedStateBackend.java | 397 +--------------------
.../state/iterator/RocksSingleStateIterator.java | 68 ++++
.../state/iterator/RocksStateKeysIterator.java | 136 +++++++
.../RocksStatesPerKeyGroupMergeIterator.java | 222 ++++++++++++
.../iterator/RocksTransformingIteratorWrapper.java | 82 +++++
...java => RocksDBRocksStateKeysIteratorTest.java} | 13 +-
...ocksKeyGroupsRocksSingleStateIteratorTest.java} | 11 +-
8 files changed, 529 insertions(+), 402 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 5f1c650..1bc49e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -28,7 +28,7 @@ import java.io.IOException;
/**
* Utils for RocksDB state serialization and deserialization.
*/
-class RocksDBKeySerializationUtils {
+public class RocksDBKeySerializationUtils {
static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
int keyGroup = 0;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0fd1125..55cc4f9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -34,6 +34,9 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,7 +45,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -131,16 +133,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterator;
@@ -383,7 +381,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
iterator.seekToFirst();
- final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+ final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
@@ -391,7 +389,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@VisibleForTesting
- ColumnFamilyHandle getColumnFamilyHandle(String state) {
+ public ColumnFamilyHandle getColumnFamilyHandle(String state) {
Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
return columnInfo != null ? columnInfo.f0 : null;
}
@@ -1451,385 +1449,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return count;
}
- /**
- * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
- * The resulting iteration sequence is ordered by (key-group, kv-state).
- */
- @VisibleForTesting
- static class RocksDBMergeIterator implements AutoCloseable {
-
- private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
- private final int keyGroupPrefixByteCount;
- private boolean newKeyGroup;
- private boolean newKVState;
- private boolean valid;
-
- MergeIterator currentSubIterator;
-
- private static final List<Comparator<MergeIterator>> COMPARATORS;
-
- static {
- int maxBytes = 2;
- COMPARATORS = new ArrayList<>(maxBytes);
- for (int i = 0; i < maxBytes; ++i) {
- final int currentBytes = i + 1;
- COMPARATORS.add((o1, o2) -> {
- int arrayCmpRes = compareKeyGroupsForByteArrays(
- o1.currentKey, o2.currentKey, currentBytes);
- return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
- });
- }
- }
-
- RocksDBMergeIterator(
- List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
- final int keyGroupPrefixByteCount) {
- Preconditions.checkNotNull(kvStateIterators);
- Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
-
- this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
- Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
-
- if (kvStateIterators.size() > 0) {
- PriorityQueue<MergeIterator> iteratorPriorityQueue =
- new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
- for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
- final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
- rocksIterator.seekToFirst();
- if (rocksIterator.isValid()) {
- iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
- } else {
- IOUtils.closeQuietly(rocksIterator);
- }
- }
-
- kvStateIterators.clear();
-
- this.heap = iteratorPriorityQueue;
- this.valid = !heap.isEmpty();
- this.currentSubIterator = heap.poll();
- } else {
- // creating a PriorityQueue of size 0 results in an exception.
- this.heap = null;
- this.valid = false;
- }
-
- this.newKeyGroup = true;
- this.newKVState = true;
- }
-
- /**
- * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
- * calls to {@link #next()}.
- */
- public void next() {
- newKeyGroup = false;
- newKVState = false;
-
- final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
- rocksIterator.next();
-
- byte[] oldKey = currentSubIterator.getCurrentKey();
- if (rocksIterator.isValid()) {
-
- currentSubIterator.currentKey = rocksIterator.key();
-
- if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
- heap.offer(currentSubIterator);
- currentSubIterator = heap.poll();
- newKVState = currentSubIterator.getIterator() != rocksIterator;
- detectNewKeyGroup(oldKey);
- }
- } else {
- IOUtils.closeQuietly(rocksIterator);
-
- if (heap.isEmpty()) {
- currentSubIterator = null;
- valid = false;
- } else {
- currentSubIterator = heap.poll();
- newKVState = true;
- detectNewKeyGroup(oldKey);
- }
- }
- }
-
- private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
- return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
- }
-
- private void detectNewKeyGroup(byte[] oldKey) {
- if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
- newKeyGroup = true;
- }
- }
-
- /**
- * @return key-group for the current key
- */
- public int keyGroup() {
- int result = 0;
- //big endian decode
- for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
- result <<= 8;
- result |= (currentSubIterator.currentKey[i] & 0xFF);
- }
- return result;
- }
-
- public byte[] key() {
- return currentSubIterator.getCurrentKey();
- }
-
- public byte[] value() {
- return currentSubIterator.getIterator().value();
- }
-
- /**
- * @return Id of K/V state to which the current key belongs.
- */
- public int kvStateId() {
- return currentSubIterator.getKvStateId();
- }
-
- /**
- * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
- * @return true iff the current key belong to a different k/v-state than it's predecessor.
- */
- public boolean isNewKeyValueState() {
- return newKVState;
- }
-
- /**
- * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
- * @return true iff the current key belong to a different key-group than it's predecessor.
- */
- public boolean isNewKeyGroup() {
- return newKeyGroup;
- }
-
- /**
- * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
- * {@link #next()} should only be called if valid returned true. Should be checked after each call to
- * {@link #next()} before accessing iterator state.
- * @return True iff this iterator is valid.
- */
- public boolean isValid() {
- return valid;
- }
-
- private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
- for (int i = 0; i < len; ++i) {
- int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
- if (diff != 0) {
- return diff;
- }
- }
- return 0;
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(currentSubIterator);
- currentSubIterator = null;
-
- IOUtils.closeAllQuietly(heap);
- heap.clear();
- }
- }
-
- /**
- * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
- * Used by #MergeIterator.
- */
- @VisibleForTesting
- protected static final class MergeIterator implements AutoCloseable {
-
- /**
- * @param iterator The #RocksIterator to wrap .
- * @param kvStateId Id of the K/V state to which this iterator belongs.
- */
- MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
- this.iterator = Preconditions.checkNotNull(iterator);
- this.currentKey = iterator.key();
- this.kvStateId = kvStateId;
- }
-
- private final RocksIteratorWrapper iterator;
- private byte[] currentKey;
- private final int kvStateId;
-
- public byte[] getCurrentKey() {
- return currentKey;
- }
-
- public void setCurrentKey(byte[] currentKey) {
- this.currentKey = currentKey;
- }
-
- public RocksIteratorWrapper getIterator() {
- return iterator;
- }
-
- public int getKvStateId() {
- return kvStateId;
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(iterator);
- }
- }
-
- private static final class TransformingRocksIteratorWrapper extends RocksIteratorWrapper {
- @Nonnull
- private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
- private byte[] current;
-
- public TransformingRocksIteratorWrapper(
- @Nonnull RocksIterator iterator,
- @Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
- super(iterator);
- this.stateSnapshotTransformer = stateSnapshotTransformer;
- }
-
- @Override
- public void seekToFirst() {
- super.seekToFirst();
- filterOrTransform(super::next);
- }
-
- @Override
- public void seekToLast() {
- super.seekToLast();
- filterOrTransform(super::prev);
- }
-
- @Override
- public void next() {
- super.next();
- filterOrTransform(super::next);
- }
-
- @Override
- public void prev() {
- super.prev();
- filterOrTransform(super::prev);
- }
-
- private void filterOrTransform(Runnable advance) {
- while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
- advance.run();
- }
- }
-
- @Override
- public byte[] value() {
- if (!isValid()) {
- throw new IllegalStateException("value() method cannot be called if isValid() is false");
- }
- return current;
- }
- }
-
- /**
- * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
- * is not thread safe.
- *
- * @param <K> the type of the iterated objects, which are keys in RocksDB.
- */
- static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
- private final RocksIteratorWrapper iterator;
- private final String state;
- private final TypeSerializer<K> keySerializer;
- private final int keyGroupPrefixBytes;
- private final byte[] namespaceBytes;
- private final boolean ambiguousKeyPossible;
- private K nextKey;
- private K previousKey;
-
- RocksIteratorForKeysWrapper(
- RocksIteratorWrapper iterator,
- String state,
- TypeSerializer<K> keySerializer,
- int keyGroupPrefixBytes,
- boolean ambiguousKeyPossible,
- byte[] namespaceBytes) {
- this.iterator = Preconditions.checkNotNull(iterator);
- this.state = Preconditions.checkNotNull(state);
- this.keySerializer = Preconditions.checkNotNull(keySerializer);
- this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
- this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
- this.nextKey = null;
- this.previousKey = null;
- this.ambiguousKeyPossible = ambiguousKeyPossible;
- }
-
- @Override
- public boolean hasNext() {
- try {
- while (nextKey == null && iterator.isValid()) {
-
- byte[] key = iterator.key();
-
- ByteArrayInputStreamWithPos inputStream =
- new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
-
- DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
-
- K value = RocksDBKeySerializationUtils.readKey(
- keySerializer,
- inputStream,
- dataInput,
- ambiguousKeyPossible);
-
- int namespaceByteStartPos = inputStream.getPosition();
-
- if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
- previousKey = value;
- nextKey = value;
- }
- iterator.next();
- }
- } catch (Exception e) {
- throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
- }
- return nextKey != null;
- }
-
- @Override
- public K next() {
- if (!hasNext()) {
- throw new NoSuchElementException("Failed to access state [" + state + "]");
- }
-
- K tmpKey = nextKey;
- nextKey = null;
- return tmpKey;
- }
-
- private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
- final int namespaceBytesLength = namespaceBytes.length;
- final int basicLength = namespaceBytesLength + beginPos;
- if (key.length >= basicLength) {
- for (int i = 0; i < namespaceBytesLength; ++i) {
- if (key[beginPos + i] != namespaceBytes[i]) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public void close() {
- iterator.close();
- }
- }
-
private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
@Override
@@ -2214,8 +1833,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
checkpointStreamWithResultProvider.getCheckpointOutputStream();
try {
- // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
- try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+ // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
+ try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
// handover complete, null out to prevent double close
@@ -2729,7 +2348,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
return stateSnapshotTransformer == null ?
new RocksIteratorWrapper(rocksIterator) :
- new TransformingRocksIteratorWrapper(rocksIterator, stateSnapshotTransformer);
+ new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
}
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
new file mode 100644
index 0000000..993b35a
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+ /**
+ * @param iterator underlying {@link RocksIteratorWrapper}
+ * @param kvStateId Id of the K/V state to which this iterator belongs.
+ */
+ RocksSingleStateIterator(@Nonnull RocksIteratorWrapper iterator, int kvStateId) {
+ this.iterator = iterator;
+ this.currentKey = iterator.key();
+ this.kvStateId = kvStateId;
+ }
+
+ @Nonnull
+ private final RocksIteratorWrapper iterator;
+ private byte[] currentKey;
+ private final int kvStateId;
+
+ public byte[] getCurrentKey() {
+ return currentKey;
+ }
+
+ public void setCurrentKey(byte[] currentKey) {
+ this.currentKey = currentKey;
+ }
+
+ @Nonnull
+ public RocksIteratorWrapper getIterator() {
+ return iterator;
+ }
+
+ public int getKvStateId() {
+ return kvStateId;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(iterator);
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
new file mode 100644
index 0000000..0fa93dc
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param <K> the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator<K> implements Iterator<K>, AutoCloseable {
+
+ @Nonnull
+ private final RocksIteratorWrapper iterator;
+
+ @Nonnull
+ private final String state;
+
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ @Nonnull
+ private final byte[] namespaceBytes;
+
+ private final boolean ambiguousKeyPossible;
+ private final int keyGroupPrefixBytes;
+ private final ByteArrayDataInputView byteArrayDataInputView;
+ private K nextKey;
+ private K previousKey;
+
+ public RocksStateKeysIterator(
+ @Nonnull RocksIteratorWrapper iterator,
+ @Nonnull String state,
+ @Nonnull TypeSerializer<K> keySerializer,
+ int keyGroupPrefixBytes,
+ boolean ambiguousKeyPossible,
+ @Nonnull byte[] namespaceBytes) {
+ this.iterator = iterator;
+ this.state = state;
+ this.keySerializer = keySerializer;
+ this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+ this.namespaceBytes = namespaceBytes;
+ this.nextKey = null;
+ this.previousKey = null;
+ this.ambiguousKeyPossible = ambiguousKeyPossible;
+ this.byteArrayDataInputView = new ByteArrayDataInputView();
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (nextKey == null && iterator.isValid()) {
+
+ final byte[] keyBytes = iterator.key();
+ final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView);
+ final int namespaceByteStartPos = byteArrayDataInputView.getPosition();
+
+ if (isMatchingNameSpace(keyBytes, namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
+ previousKey = currentKey;
+ nextKey = currentKey;
+ }
+ iterator.next();
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
+ }
+ return nextKey != null;
+ }
+
+ @Override
+ public K next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Failed to access state [" + state + "]");
+ }
+
+ K tmpKey = nextKey;
+ nextKey = null;
+ return tmpKey;
+ }
+
+ private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView readView) throws IOException {
+ readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
+ return RocksDBKeySerializationUtils.readKey(
+ keySerializer,
+ byteArrayDataInputView,
+ ambiguousKeyPossible);
+ }
+
+ private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
+ final int namespaceBytesLength = namespaceBytes.length;
+ final int basicLength = namespaceBytesLength + beginPos;
+ if (key.length >= basicLength) {
+ for (int i = 0; i < namespaceBytesLength; ++i) {
+ if (key[beginPos + i] != namespaceBytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
new file mode 100644
index 0000000..20e5dd0
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+ private final PriorityQueue<RocksSingleStateIterator> heap;
+ private final int keyGroupPrefixByteCount;
+ private boolean newKeyGroup;
+ private boolean newKVState;
+ private boolean valid;
+ private RocksSingleStateIterator currentSubIterator;
+
+ private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS;
+
+ static {
+ int maxBytes = 2;
+ COMPARATORS = new ArrayList<>(maxBytes);
+ for (int i = 0; i < maxBytes; ++i) {
+ final int currentBytes = i + 1;
+ COMPARATORS.add((o1, o2) -> {
+ int arrayCmpRes = compareKeyGroupsForByteArrays(
+ o1.getCurrentKey(), o2.getCurrentKey(), currentBytes);
+ return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+ });
+ }
+ }
+
+ public RocksStatesPerKeyGroupMergeIterator(
+ List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+ final int keyGroupPrefixByteCount) {
+ Preconditions.checkNotNull(kvStateIterators);
+ Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+ this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+ if (kvStateIterators.size() > 0) {
+ this.heap = buildIteratorHeap(kvStateIterators);
+ this.valid = !heap.isEmpty();
+ this.currentSubIterator = heap.poll();
+ kvStateIterators.clear();
+ } else {
+ // creating a PriorityQueue of size 0 results in an exception.
+ this.heap = null;
+ this.valid = false;
+ }
+
+ this.newKeyGroup = true;
+ this.newKVState = true;
+ }
+
+ /**
+ * Advances the iterator. Should only be called if {@link #isValid()} returned true.
+ * Valid flag can only change after calling {@link #next()}.
+ */
+ public void next() {
+ newKeyGroup = false;
+ newKVState = false;
+
+ final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
+ rocksIterator.next();
+
+ byte[] oldKey = currentSubIterator.getCurrentKey();
+ if (rocksIterator.isValid()) {
+
+ currentSubIterator.setCurrentKey(rocksIterator.key());
+
+ if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+ heap.offer(currentSubIterator);
+ currentSubIterator = heap.remove();
+ newKVState = currentSubIterator.getIterator() != rocksIterator;
+ detectNewKeyGroup(oldKey);
+ }
+ } else {
+ IOUtils.closeQuietly(rocksIterator);
+
+ if (heap.isEmpty()) {
+ currentSubIterator = null;
+ valid = false;
+ } else {
+ currentSubIterator = heap.remove();
+ newKVState = true;
+ detectNewKeyGroup(oldKey);
+ }
+ }
+ }
+
+ private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+ List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) {
+
+ Comparator<RocksSingleStateIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+ PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+ new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+ for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+ final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
+ rocksIterator.seekToFirst();
+ if (rocksIterator.isValid()) {
+ iteratorPriorityQueue.offer(
+ new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ } else {
+ IOUtils.closeQuietly(rocksIterator);
+ }
+ }
+ return iteratorPriorityQueue;
+ }
+
+ private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+ return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
+ }
+
+ private void detectNewKeyGroup(byte[] oldKey) {
+ if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+ newKeyGroup = true;
+ }
+ }
+
+ /**
+ * @return key-group for the current key
+ */
+ public int keyGroup() {
+ final byte[] currentKey = currentSubIterator.getCurrentKey();
+ int result = 0;
+ //big endian decode
+ for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+ result <<= 8;
+ result |= (currentKey[i] & 0xFF);
+ }
+ return result;
+ }
+
+ public byte[] key() {
+ return currentSubIterator.getCurrentKey();
+ }
+
+ public byte[] value() {
+ return currentSubIterator.getIterator().value();
+ }
+
+ /**
+ * @return Id of K/V state to which the current key belongs.
+ */
+ public int kvStateId() {
+ return currentSubIterator.getKvStateId();
+ }
+
+ /**
+ * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
+ * @return true iff the current key belong to a different k/v-state than it's predecessor.
+ */
+ public boolean isNewKeyValueState() {
+ return newKVState;
+ }
+
+ /**
+ * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
+ * @return true iff the current key belong to a different key-group than it's predecessor.
+ */
+ public boolean isNewKeyGroup() {
+ return newKeyGroup;
+ }
+
+ /**
+ * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
+ * {@link #next()} should only be called if valid returned true. Should be checked after each call to
+ * {@link #next()} before accessing iterator state.
+ * @return True iff this iterator is valid.
+ */
+ public boolean isValid() {
+ return valid;
+ }
+
+ private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
+ for (int i = 0; i < len; ++i) {
+ int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(currentSubIterator);
+ currentSubIterator = null;
+
+ IOUtils.closeAllQuietly(heap);
+ heap.clear();
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
new file mode 100644
index 0000000..e2fec42
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIterator} that applies a given {@link StateSnapshotTransformer} to the elements
+ * during the iteration.
+ */
+public class RocksTransformingIteratorWrapper extends RocksIteratorWrapper {
+
+ @Nonnull
+ private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
+ private byte[] current;
+
+ public RocksTransformingIteratorWrapper(
+ @Nonnull RocksIterator iterator,
+ @Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
+ super(iterator);
+ this.stateSnapshotTransformer = stateSnapshotTransformer;
+ }
+
+ @Override
+ public void seekToFirst() {
+ super.seekToFirst();
+ filterOrTransform(super::next);
+ }
+
+ @Override
+ public void seekToLast() {
+ super.seekToLast();
+ filterOrTransform(super::prev);
+ }
+
+ @Override
+ public void next() {
+ super.next();
+ filterOrTransform(super::next);
+ }
+
+ @Override
+ public void prev() {
+ super.prev();
+ filterOrTransform(super::prev);
+ }
+
+ private void filterOrTransform(@Nonnull Runnable advance) {
+ while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
+ advance.run();
+ }
+ }
+
+ @Override
+ public byte[] value() {
+ if (!isValid()) {
+ throw new IllegalStateException("value() method cannot be called if isValid() is false");
+ }
+ return current;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index f560998..e042ebd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -47,7 +47,7 @@ import static org.mockito.Mockito.mock;
/**
* Tests for the RocksIteratorWrapper.
*/
-public class RocksDBRocksIteratorForKeysWrapperTest {
+public class RocksDBRocksStateKeysIteratorTest {
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
@@ -105,13 +105,12 @@ public class RocksDBRocksIteratorForKeysWrapperTest {
testState.update(String.valueOf(i));
}
- ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
+ ByteArrayDataOutputView outputStream = new ByteArrayDataOutputView(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
outputStream,
- new DataOutputViewStreamWrapper(outputStream),
ambiguousKeyPossible);
byte[] nameSpaceBytes = outputStream.toByteArray();
@@ -119,8 +118,8 @@ public class RocksDBRocksIteratorForKeysWrapperTest {
try (
ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
- RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
- new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+ RocksStateKeysIterator<K> iteratorWrapper =
+ new RocksStateKeysIterator<>(
iterator,
testStateName,
keySerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index cb2b202..e1240a8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.util.IOUtils;
@@ -39,9 +40,9 @@ import java.util.List;
import java.util.Random;
/**
- * Tests for the RocksDBMergeIterator.
+ * Tests for the RocksStatesPerKeyGroupMergeIterator.
*/
-public class RocksDBMergeIteratorTest {
+public class RocksKeyGroupsRocksSingleStateIteratorTest {
private static final int NUM_KEY_VAL_STATES = 50;
private static final int MAX_NUM_KEYS = 20;
@@ -51,8 +52,8 @@ public class RocksDBMergeIteratorTest {
@Test
public void testEmptyMergeIterator() throws Exception {
- RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
- new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
+ RocksStatesPerKeyGroupMergeIterator emptyIterator =
+ new RocksStatesPerKeyGroupMergeIterator(Collections.emptyList(), 2);
Assert.assertFalse(emptyIterator.isValid());
}
@@ -111,7 +112,7 @@ public class RocksDBMergeIteratorTest {
++id;
}
- try (RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
+ try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
rocksIteratorsWithKVStateId,
maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {