You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/10 18:10:24 UTC

[flink] 01/02: [FLINK-10041][state] Extract iterators from RocksDBKeyedStateBackend (inner or static inner classes) into full classes

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d67afbb84736ee8c28ec1bd69eef8ddff4754f0
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Aug 6 11:21:58 2018 +0200

    [FLINK-10041][state] Extract iterators from RocksDBKeyedStateBackend (inner or static inner classes) into full classes
    
    This closes #6501.
---
 .../state/RocksDBKeySerializationUtils.java        |   2 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  | 397 +--------------------
 .../state/iterator/RocksSingleStateIterator.java   |  68 ++++
 .../state/iterator/RocksStateKeysIterator.java     | 136 +++++++
 .../RocksStatesPerKeyGroupMergeIterator.java       | 222 ++++++++++++
 .../iterator/RocksTransformingIteratorWrapper.java |  82 +++++
 ...java => RocksDBRocksStateKeysIteratorTest.java} |  13 +-
 ...ocksKeyGroupsRocksSingleStateIteratorTest.java} |  11 +-
 8 files changed, 529 insertions(+), 402 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 5f1c650..1bc49e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -28,7 +28,7 @@ import java.io.IOException;
 /**
  * Utils for RocksDB state serialization and deserialization.
  */
-class RocksDBKeySerializationUtils {
+public class RocksDBKeySerializationUtils {
 
 	static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
 		int keyGroup = 0;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0fd1125..55cc4f9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -34,6 +34,9 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,7 +45,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -131,16 +133,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.Spliterator;
@@ -383,7 +381,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
 		iterator.seekToFirst();
 
-		final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
 			ambiguousKeyPossible, nameSpaceBytes);
 
 		Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
@@ -391,7 +389,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@VisibleForTesting
-	ColumnFamilyHandle getColumnFamilyHandle(String state) {
+	public ColumnFamilyHandle getColumnFamilyHandle(String state) {
 		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
 		return columnInfo != null ? columnInfo.f0 : null;
 	}
@@ -1451,385 +1449,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return count;
 	}
 
-	/**
-	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
-	 * The resulting iteration sequence is ordered by (key-group, kv-state).
-	 */
-	@VisibleForTesting
-	static class RocksDBMergeIterator implements AutoCloseable {
-
-		private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
-		private final int keyGroupPrefixByteCount;
-		private boolean newKeyGroup;
-		private boolean newKVState;
-		private boolean valid;
-
-		MergeIterator currentSubIterator;
-
-		private static final List<Comparator<MergeIterator>> COMPARATORS;
-
-		static {
-			int maxBytes = 2;
-			COMPARATORS = new ArrayList<>(maxBytes);
-			for (int i = 0; i < maxBytes; ++i) {
-				final int currentBytes = i + 1;
-				COMPARATORS.add((o1, o2) -> {
-					int arrayCmpRes = compareKeyGroupsForByteArrays(
-						o1.currentKey, o2.currentKey, currentBytes);
-					return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-				});
-			}
-		}
-
-		RocksDBMergeIterator(
-			List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
-			final int keyGroupPrefixByteCount) {
-			Preconditions.checkNotNull(kvStateIterators);
-			Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
-
-			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
-			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
-
-			if (kvStateIterators.size() > 0) {
-				PriorityQueue<MergeIterator> iteratorPriorityQueue =
-					new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
-				for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
-					final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
-					rocksIterator.seekToFirst();
-					if (rocksIterator.isValid()) {
-						iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
-					} else {
-						IOUtils.closeQuietly(rocksIterator);
-					}
-				}
-
-				kvStateIterators.clear();
-
-				this.heap = iteratorPriorityQueue;
-				this.valid = !heap.isEmpty();
-				this.currentSubIterator = heap.poll();
-			} else {
-				// creating a PriorityQueue of size 0 results in an exception.
-				this.heap = null;
-				this.valid = false;
-			}
-
-			this.newKeyGroup = true;
-			this.newKVState = true;
-		}
-
-		/**
-		 * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
-		 * calls to {@link #next()}.
-		 */
-		public void next() {
-			newKeyGroup = false;
-			newKVState = false;
-
-			final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
-			rocksIterator.next();
-
-			byte[] oldKey = currentSubIterator.getCurrentKey();
-			if (rocksIterator.isValid()) {
-
-				currentSubIterator.currentKey = rocksIterator.key();
-
-				if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
-					heap.offer(currentSubIterator);
-					currentSubIterator = heap.poll();
-					newKVState = currentSubIterator.getIterator() != rocksIterator;
-					detectNewKeyGroup(oldKey);
-				}
-			} else {
-				IOUtils.closeQuietly(rocksIterator);
-
-				if (heap.isEmpty()) {
-					currentSubIterator = null;
-					valid = false;
-				} else {
-					currentSubIterator = heap.poll();
-					newKVState = true;
-					detectNewKeyGroup(oldKey);
-				}
-			}
-		}
-
-		private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
-			return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
-		}
-
-		private void detectNewKeyGroup(byte[] oldKey) {
-			if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
-				newKeyGroup = true;
-			}
-		}
-
-		/**
-		 * @return key-group for the current key
-		 */
-		public int keyGroup() {
-			int result = 0;
-			//big endian decode
-			for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
-				result <<= 8;
-				result |= (currentSubIterator.currentKey[i] & 0xFF);
-			}
-			return result;
-		}
-
-		public byte[] key() {
-			return currentSubIterator.getCurrentKey();
-		}
-
-		public byte[] value() {
-			return currentSubIterator.getIterator().value();
-		}
-
-		/**
-		 * @return Id of K/V state to which the current key belongs.
-		 */
-		public int kvStateId() {
-			return currentSubIterator.getKvStateId();
-		}
-
-		/**
-		 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
-		 * @return true iff the current key belong to a different k/v-state than it's predecessor.
-		 */
-		public boolean isNewKeyValueState() {
-			return newKVState;
-		}
-
-		/**
-		 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
-		 * @return true iff the current key belong to a different key-group than it's predecessor.
-		 */
-		public boolean isNewKeyGroup() {
-			return newKeyGroup;
-		}
-
-		/**
-		 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
-		 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
-		 * {@link #next()} before accessing iterator state.
-		 * @return True iff this iterator is valid.
-		 */
-		public boolean isValid() {
-			return valid;
-		}
-
-		private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
-			for (int i = 0; i < len; ++i) {
-				int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
-				if (diff != 0) {
-					return diff;
-				}
-			}
-			return 0;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(currentSubIterator);
-			currentSubIterator = null;
-
-			IOUtils.closeAllQuietly(heap);
-			heap.clear();
-		}
-	}
-
-	/**
-	 * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
-	 * Used by #MergeIterator.
-	 */
-	@VisibleForTesting
-	protected static final class MergeIterator implements AutoCloseable {
-
-		/**
-		 * @param iterator  The #RocksIterator to wrap .
-		 * @param kvStateId Id of the K/V state to which this iterator belongs.
-		 */
-		MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.currentKey = iterator.key();
-			this.kvStateId = kvStateId;
-		}
-
-		private final RocksIteratorWrapper iterator;
-		private byte[] currentKey;
-		private final int kvStateId;
-
-		public byte[] getCurrentKey() {
-			return currentKey;
-		}
-
-		public void setCurrentKey(byte[] currentKey) {
-			this.currentKey = currentKey;
-		}
-
-		public RocksIteratorWrapper getIterator() {
-			return iterator;
-		}
-
-		public int getKvStateId() {
-			return kvStateId;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(iterator);
-		}
-	}
-
-	private static final class TransformingRocksIteratorWrapper extends RocksIteratorWrapper {
-		@Nonnull
-		private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
-		private byte[] current;
-
-		public TransformingRocksIteratorWrapper(
-			@Nonnull RocksIterator iterator,
-			@Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
-			super(iterator);
-			this.stateSnapshotTransformer = stateSnapshotTransformer;
-		}
-
-		@Override
-		public void seekToFirst() {
-			super.seekToFirst();
-			filterOrTransform(super::next);
-		}
-
-		@Override
-		public void seekToLast() {
-			super.seekToLast();
-			filterOrTransform(super::prev);
-		}
-
-		@Override
-		public void next() {
-			super.next();
-			filterOrTransform(super::next);
-		}
-
-		@Override
-		public void prev() {
-			super.prev();
-			filterOrTransform(super::prev);
-		}
-
-		private void filterOrTransform(Runnable advance) {
-			while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
-				advance.run();
-			}
-		}
-
-		@Override
-		public byte[] value() {
-			if (!isValid()) {
-				throw new IllegalStateException("value() method cannot be called if isValid() is false");
-			}
-			return current;
-		}
-	}
-
-	/**
-	 * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
-	 * is not thread safe.
-	 *
-	 * @param <K> the type of the iterated objects, which are keys in RocksDB.
-	 */
-	static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
-		private final RocksIteratorWrapper iterator;
-		private final String state;
-		private final TypeSerializer<K> keySerializer;
-		private final int keyGroupPrefixBytes;
-		private final byte[] namespaceBytes;
-		private final boolean ambiguousKeyPossible;
-		private K nextKey;
-		private K previousKey;
-
-		RocksIteratorForKeysWrapper(
-			RocksIteratorWrapper iterator,
-			String state,
-			TypeSerializer<K> keySerializer,
-			int keyGroupPrefixBytes,
-			boolean ambiguousKeyPossible,
-			byte[] namespaceBytes) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.state = Preconditions.checkNotNull(state);
-			this.keySerializer = Preconditions.checkNotNull(keySerializer);
-			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
-			this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
-			this.nextKey = null;
-			this.previousKey = null;
-			this.ambiguousKeyPossible = ambiguousKeyPossible;
-		}
-
-		@Override
-		public boolean hasNext() {
-			try {
-				while (nextKey == null && iterator.isValid()) {
-
-					byte[] key = iterator.key();
-
-					ByteArrayInputStreamWithPos inputStream =
-						new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
-
-					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
-
-					K value = RocksDBKeySerializationUtils.readKey(
-						keySerializer,
-						inputStream,
-						dataInput,
-						ambiguousKeyPossible);
-
-					int namespaceByteStartPos = inputStream.getPosition();
-
-					if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
-						previousKey = value;
-						nextKey = value;
-					}
-					iterator.next();
-				}
-			} catch (Exception e) {
-				throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
-			}
-			return nextKey != null;
-		}
-
-		@Override
-		public K next() {
-			if (!hasNext()) {
-				throw new NoSuchElementException("Failed to access state [" + state + "]");
-			}
-
-			K tmpKey = nextKey;
-			nextKey = null;
-			return tmpKey;
-		}
-
-		private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
-			final int namespaceBytesLength = namespaceBytes.length;
-			final int basicLength = namespaceBytesLength + beginPos;
-			if (key.length >= basicLength) {
-				for (int i = 0; i < namespaceBytesLength; ++i) {
-					if (key[beginPos + i] != namespaceBytes[i]) {
-						return false;
-					}
-				}
-				return true;
-			}
-			return false;
-		}
-
-		@Override
-		public void close() {
-			iterator.close();
-		}
-	}
-
 	private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
 
 		@Override
@@ -2214,8 +1833,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				checkpointStreamWithResultProvider.getCheckpointOutputStream();
 
 			try {
-				// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
-				try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+				// Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
+				try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
 					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
 
 					// handover complete, null out to prevent double close
@@ -2729,7 +2348,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
 		return stateSnapshotTransformer == null ?
 			new RocksIteratorWrapper(rocksIterator) :
-			new TransformingRocksIteratorWrapper(rocksIterator, stateSnapshotTransformer);
+			new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
new file mode 100644
index 0000000..993b35a
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+	/**
+	 * @param iterator underlying {@link RocksIteratorWrapper}
+	 * @param kvStateId Id of the K/V state to which this iterator belongs.
+	 */
+	RocksSingleStateIterator(@Nonnull RocksIteratorWrapper iterator, int kvStateId) {
+		this.iterator = iterator;
+		this.currentKey = iterator.key();
+		this.kvStateId = kvStateId;
+	}
+
+	@Nonnull
+	private final RocksIteratorWrapper iterator;
+	private byte[] currentKey;
+	private final int kvStateId;
+
+	public byte[] getCurrentKey() {
+		return currentKey;
+	}
+
+	public void setCurrentKey(byte[] currentKey) {
+		this.currentKey = currentKey;
+	}
+
+	@Nonnull
+	public RocksIteratorWrapper getIterator() {
+		return iterator;
+	}
+
+	public int getKvStateId() {
+		return kvStateId;
+	}
+
+	@Override
+	public void close() {
+		IOUtils.closeQuietly(iterator);
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
new file mode 100644
index 0000000..0fa93dc
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param <K> the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator<K> implements Iterator<K>, AutoCloseable {
+
+	@Nonnull
+	private final RocksIteratorWrapper iterator;
+
+	@Nonnull
+	private final String state;
+
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	@Nonnull
+	private final byte[] namespaceBytes;
+
+	private final boolean ambiguousKeyPossible;
+	private final int keyGroupPrefixBytes;
+	private final ByteArrayDataInputView byteArrayDataInputView;
+	private K nextKey;
+	private K previousKey;
+
+	public RocksStateKeysIterator(
+		@Nonnull RocksIteratorWrapper iterator,
+		@Nonnull String state,
+		@Nonnull TypeSerializer<K> keySerializer,
+		int keyGroupPrefixBytes,
+		boolean ambiguousKeyPossible,
+		@Nonnull byte[] namespaceBytes) {
+		this.iterator = iterator;
+		this.state = state;
+		this.keySerializer = keySerializer;
+		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+		this.namespaceBytes = namespaceBytes;
+		this.nextKey = null;
+		this.previousKey = null;
+		this.ambiguousKeyPossible = ambiguousKeyPossible;
+		this.byteArrayDataInputView = new ByteArrayDataInputView();
+	}
+
+	@Override
+	public boolean hasNext() {
+		try {
+			while (nextKey == null && iterator.isValid()) {
+
+				final byte[] keyBytes = iterator.key();
+				final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView);
+				final int namespaceByteStartPos = byteArrayDataInputView.getPosition();
+
+				if (isMatchingNameSpace(keyBytes, namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
+					previousKey = currentKey;
+					nextKey = currentKey;
+				}
+				iterator.next();
+			}
+		} catch (Exception e) {
+			throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
+		}
+		return nextKey != null;
+	}
+
+	@Override
+	public K next() {
+		if (!hasNext()) {
+			throw new NoSuchElementException("Failed to access state [" + state + "]");
+		}
+
+		K tmpKey = nextKey;
+		nextKey = null;
+		return tmpKey;
+	}
+
+	private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView readView) throws IOException {
+		readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
+		return RocksDBKeySerializationUtils.readKey(
+			keySerializer,
+			byteArrayDataInputView,
+			ambiguousKeyPossible);
+	}
+
+	private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
+		final int namespaceBytesLength = namespaceBytes.length;
+		final int basicLength = namespaceBytesLength + beginPos;
+		if (key.length >= basicLength) {
+			for (int i = 0; i < namespaceBytesLength; ++i) {
+				if (key[beginPos + i] != namespaceBytes[i]) {
+					return false;
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void close() {
+		iterator.close();
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
new file mode 100644
index 0000000..20e5dd0
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+	private final PriorityQueue<RocksSingleStateIterator> heap;
+	private final int keyGroupPrefixByteCount;
+	private boolean newKeyGroup;
+	private boolean newKVState;
+	private boolean valid;
+	private RocksSingleStateIterator currentSubIterator;
+
+	private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS;
+
+	static {
+		int maxBytes = 2;
+		COMPARATORS = new ArrayList<>(maxBytes);
+		for (int i = 0; i < maxBytes; ++i) {
+			final int currentBytes = i + 1;
+			COMPARATORS.add((o1, o2) -> {
+				int arrayCmpRes = compareKeyGroupsForByteArrays(
+					o1.getCurrentKey(), o2.getCurrentKey(), currentBytes);
+				return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+			});
+		}
+	}
+
+	public RocksStatesPerKeyGroupMergeIterator(
+		List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+		final int keyGroupPrefixByteCount) {
+		Preconditions.checkNotNull(kvStateIterators);
+		Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+		this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+		if (kvStateIterators.size() > 0) {
+			this.heap = buildIteratorHeap(kvStateIterators);
+			this.valid = !heap.isEmpty();
+			this.currentSubIterator = heap.poll();
+			kvStateIterators.clear();
+		} else {
+			// creating a PriorityQueue of size 0 results in an exception.
+			this.heap = null;
+			this.valid = false;
+		}
+
+		this.newKeyGroup = true;
+		this.newKVState = true;
+	}
+
+	/**
+	 * Advances the iterator. Should only be called if {@link #isValid()} returned true.
+	 * Valid flag can only change after calling {@link #next()}.
+	 */
+	public void next() {
+		newKeyGroup = false;
+		newKVState = false;
+
+		final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
+		rocksIterator.next();
+
+		byte[] oldKey = currentSubIterator.getCurrentKey();
+		if (rocksIterator.isValid()) {
+
+			currentSubIterator.setCurrentKey(rocksIterator.key());
+
+			if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+				heap.offer(currentSubIterator);
+				currentSubIterator = heap.remove();
+				newKVState = currentSubIterator.getIterator() != rocksIterator;
+				detectNewKeyGroup(oldKey);
+			}
+		} else {
+			IOUtils.closeQuietly(rocksIterator);
+
+			if (heap.isEmpty()) {
+				currentSubIterator = null;
+				valid = false;
+			} else {
+				currentSubIterator = heap.remove();
+				newKVState = true;
+				detectNewKeyGroup(oldKey);
+			}
+		}
+	}
+
+	private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+		List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) {
+
+		Comparator<RocksSingleStateIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+		PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+			new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+		for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+			final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
+			rocksIterator.seekToFirst();
+			if (rocksIterator.isValid()) {
+				iteratorPriorityQueue.offer(
+					new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+			} else {
+				IOUtils.closeQuietly(rocksIterator);
+			}
+		}
+		return iteratorPriorityQueue;
+	}
+
+	private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+		return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
+	}
+
+	private void detectNewKeyGroup(byte[] oldKey) {
+		if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+			newKeyGroup = true;
+		}
+	}
+
+	/**
+	 * @return key-group for the current key
+	 */
+	public int keyGroup() {
+		final byte[] currentKey = currentSubIterator.getCurrentKey();
+		int result = 0;
+		//big endian decode
+		for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+			result <<= 8;
+			result |= (currentKey[i] & 0xFF);
+		}
+		return result;
+	}
+
+	public byte[] key() {
+		return currentSubIterator.getCurrentKey();
+	}
+
+	public byte[] value() {
+		return currentSubIterator.getIterator().value();
+	}
+
+	/**
+	 * @return Id of K/V state to which the current key belongs.
+	 */
+	public int kvStateId() {
+		return currentSubIterator.getKvStateId();
+	}
+
+	/**
+	 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
+	 * @return true iff the current key belong to a different k/v-state than it's predecessor.
+	 */
+	public boolean isNewKeyValueState() {
+		return newKVState;
+	}
+
+	/**
+	 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
+	 * @return true iff the current key belong to a different key-group than it's predecessor.
+	 */
+	public boolean isNewKeyGroup() {
+		return newKeyGroup;
+	}
+
+	/**
+	 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
+	 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
+	 * {@link #next()} before accessing iterator state.
+	 * @return True iff this iterator is valid.
+	 */
+	public boolean isValid() {
+		return valid;
+	}
+
+	private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
+		for (int i = 0; i < len; ++i) {
+			int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+			if (diff != 0) {
+				return diff;
+			}
+		}
+		return 0;
+	}
+
+	@Override
+	public void close() {
+		IOUtils.closeQuietly(currentSubIterator);
+		currentSubIterator = null;
+
+		IOUtils.closeAllQuietly(heap);
+		heap.clear();
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
new file mode 100644
index 0000000..e2fec42
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIterator} that applies a given {@link StateSnapshotTransformer} to the elements
+ * during the iteration.
+ */
+public class RocksTransformingIteratorWrapper extends RocksIteratorWrapper {
+
+	@Nonnull
+	private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
+	private byte[] current;
+
+	public RocksTransformingIteratorWrapper(
+		@Nonnull RocksIterator iterator,
+		@Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
+		super(iterator);
+		this.stateSnapshotTransformer = stateSnapshotTransformer;
+	}
+
+	@Override
+	public void seekToFirst() {
+		super.seekToFirst();
+		filterOrTransform(super::next);
+	}
+
+	@Override
+	public void seekToLast() {
+		super.seekToLast();
+		filterOrTransform(super::prev);
+	}
+
+	@Override
+	public void next() {
+		super.next();
+		filterOrTransform(super::next);
+	}
+
+	@Override
+	public void prev() {
+		super.prev();
+		filterOrTransform(super::prev);
+	}
+
+	private void filterOrTransform(@Nonnull Runnable advance) {
+		while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
+			advance.run();
+		}
+	}
+
+	@Override
+	public byte[] value() {
+		if (!isValid()) {
+			throw new IllegalStateException("value() method cannot be called if isValid() is false");
+		}
+		return current;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index f560998..e042ebd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -47,7 +47,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the RocksIteratorWrapper.
  */
-public class RocksDBRocksIteratorForKeysWrapperTest {
+public class RocksDBRocksStateKeysIteratorTest {
 
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
@@ -105,13 +105,12 @@ public class RocksDBRocksIteratorForKeysWrapperTest {
 				testState.update(String.valueOf(i));
 			}
 
-			ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
+			ByteArrayDataOutputView outputStream = new ByteArrayDataOutputView(8);
 			boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 			RocksDBKeySerializationUtils.writeNameSpace(
 				namespace,
 				namespaceSerializer,
 				outputStream,
-				new DataOutputViewStreamWrapper(outputStream),
 				ambiguousKeyPossible);
 
 			byte[] nameSpaceBytes = outputStream.toByteArray();
@@ -119,8 +118,8 @@ public class RocksDBRocksIteratorForKeysWrapperTest {
 			try (
 				ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 				RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
-				RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
-					new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+				RocksStateKeysIterator<K> iteratorWrapper =
+					new RocksStateKeysIterator<>(
 						iterator,
 						testStateName,
 						keySerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index cb2b202..e1240a8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.util.IOUtils;
 
@@ -39,9 +40,9 @@ import java.util.List;
 import java.util.Random;
 
 /**
- * Tests for the RocksDBMergeIterator.
+ * Tests for the RocksStatesPerKeyGroupMergeIterator.
  */
-public class RocksDBMergeIteratorTest {
+public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 	private static final int NUM_KEY_VAL_STATES = 50;
 	private static final int MAX_NUM_KEYS = 20;
@@ -51,8 +52,8 @@ public class RocksDBMergeIteratorTest {
 
 	@Test
 	public void testEmptyMergeIterator() throws Exception {
-		RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
-				new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
+		RocksStatesPerKeyGroupMergeIterator emptyIterator =
+				new RocksStatesPerKeyGroupMergeIterator(Collections.emptyList(), 2);
 		Assert.assertFalse(emptyIterator.isValid());
 	}
 
@@ -111,7 +112,7 @@ public class RocksDBMergeIteratorTest {
 				++id;
 			}
 
-			try (RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
+			try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
 				rocksIteratorsWithKVStateId,
 				maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {