You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/10 18:11:56 UTC

[GitHub] asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0c781..698a9f97dc0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public void setPosition(int pos) {
 	public void setData(@Nonnull byte[] buffer, int offset, int length) {
 		inStreamWithPos.setBuffer(buffer, offset, length);
 	}
+
+	public void setData(@Nonnull byte[] buffer) {
+		setData(buffer, 0, buffer.length);
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f84e46..2a9ab7589a9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ SV getInternal(byte[] key) {
 			if (valueBytes == null) {
 				return null;
 			}
-			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+			dataInputView.setData(valueBytes);
+			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089106f..65b7f1fa4a7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@
 
 	protected final WriteOptions writeOptions;
 
-	protected final ByteArrayOutputStreamWithPos keySerializationStream;
+	protected final ByteArrayDataOutputView dataOutputView;
 
-	protected final DataOutputView keySerializationDataOutputView;
+	protected final ByteArrayDataInputView dataInputView;
 
 	private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ protected AbstractRocksDBState(
 		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
 		this.defaultValue = defaultValue;
 
-		this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
-		this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
-		this.ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+		this.dataOutputView = new ByteArrayDataOutputView(128);
+		this.dataInputView = new ByteArrayDataInputView();
+		this.ambiguousKeyPossible =
+			RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -109,7 +109,7 @@ protected AbstractRocksDBState(
 	public void clear() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			backend.db.delete(columnFamily, writeOptions, key);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
@@ -141,8 +141,7 @@ public void setCurrentNamespace(N namespace) {
 
 		// we cannot reuse the keySerializationStream member since this method
 		// is called concurrently to the other ones and it may thus contain garbage
-		ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
-		DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
+		ByteArrayDataOutputView tmpKeySerializationView = new ByteArrayDataOutputView(128);
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -150,16 +149,15 @@ public void setCurrentNamespace(N namespace) {
 				safeKeySerializer,
 				keyAndNamespace.f1,
 				safeNamespaceSerializer,
-				tmpKeySerializationStream,
-				tmpKeySerializationDateDataOutputView);
+				tmpKeySerializationView);
 
-		return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
+		return backend.db.get(columnFamily, tmpKeySerializationView.toByteArray());
 	}
 
 	byte[] getKeyBytes() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			return keySerializationStream.toByteArray();
+			return dataOutputView.toByteArray();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing key", e);
 		}
@@ -167,9 +165,9 @@ public void setCurrentNamespace(N namespace) {
 
 	byte[] getValueBytes(V value) {
 		try {
-			keySerializationStream.reset();
-			valueSerializer.serialize(value, new DataOutputViewStreamWrapper(keySerializationStream));
-			return keySerializationStream.toByteArray();
+			dataOutputView.reset();
+			valueSerializer.serialize(value, dataOutputView);
+			return dataOutputView.toByteArray();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing value", e);
 		}
@@ -180,14 +178,12 @@ protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
 			backend.getCurrentKeyGroupIndex(),
 			backend.getCurrentKey(),
 			currentNamespace,
-			keySerializationStream,
-			keySerializationDataOutputView);
+			dataOutputView);
 	}
 
 	protected void writeKeyWithGroupAndNamespace(
 			int keyGroup, K key, N namespace,
-			ByteArrayOutputStreamWithPos keySerializationStream,
-			DataOutputView keySerializationDataOutputView) throws IOException {
+			ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -195,7 +191,6 @@ protected void writeKeyWithGroupAndNamespace(
 				backend.getKeySerializer(),
 				namespace,
 				namespaceSerializer,
-				keySerializationStream,
 				keySerializationDataOutputView);
 	}
 
@@ -205,17 +200,16 @@ protected void writeKeyWithGroupAndNamespace(
 			final TypeSerializer<K> keySerializer,
 			final N namespace,
 			final TypeSerializer<N> namespaceSerializer,
-			final ByteArrayOutputStreamWithPos keySerializationStream,
-			final DataOutputView keySerializationDataOutputView) throws IOException {
+			final ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
 
 		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
 		Preconditions.checkNotNull(keySerializer);
 		Preconditions.checkNotNull(namespaceSerializer);
 
-		keySerializationStream.reset();
+		keySerializationDataOutputView.reset();
 		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
-		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
+		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
 	}
 
 	protected V getDefaultValue() {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 209d18f43c8..4f9ef2f811c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,6 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -121,17 +119,15 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] sourceKey = dataOutputView.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						ACC value = valueSerializer.deserialize(
-								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+						dataInputView.setData(valueBytes);
+						ACC value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
 							current = aggFunction.merge(current, value);
@@ -146,27 +142,25 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(
-						keyGroup, key, target,
-						keySerializationStream, keySerializationDataOutputView);
+				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetKey = dataOutputView.toByteArray();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
 					// target also had a value, merge
-					ACC value = valueSerializer.deserialize(
-							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+					dataInputView.setData(targetValueBytes);
+					ACC value = valueSerializer.deserialize(dataInputView);
 
 					current = aggFunction.merge(current, value);
 				}
 
 				// serialize the resulting value
-				keySerializationStream.reset();
-				valueSerializer.serialize(current, keySerializationDataOutputView);
+				dataOutputView.reset();
+				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 5f1c650de45..7c9e3f8c3f0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -18,8 +18,8 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -28,7 +28,7 @@
 /**
  * Utils for RocksDB state serialization and deserialization.
  */
-class RocksDBKeySerializationUtils {
+public class RocksDBKeySerializationUtils {
 
 	static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
 		int keyGroup = 0;
@@ -41,13 +41,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 
 	public static <K> K readKey(
 		TypeSerializer<K> keySerializer,
-		ByteArrayInputStreamWithPos inputStream,
-		DataInputView inputView,
+		ByteArrayDataInputView inputView,
 		boolean ambiguousKeyPossible) throws IOException {
-		int beforeRead = inputStream.getPosition();
+		int beforeRead = inputView.getPosition();
 		K key = keySerializer.deserialize(inputView);
 		if (ambiguousKeyPossible) {
-			int length = inputStream.getPosition() - beforeRead;
+			int length = inputView.getPosition() - beforeRead;
 			readVariableIntBytes(inputView, length);
 		}
 		return key;
@@ -55,13 +54,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 
 	public static <N> N readNamespace(
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayInputStreamWithPos inputStream,
-		DataInputView inputView,
+		ByteArrayDataInputView inputView,
 		boolean ambiguousKeyPossible) throws IOException {
-		int beforeRead = inputStream.getPosition();
+		int beforeRead = inputView.getPosition();
 		N namespace = namespaceSerializer.deserialize(inputView);
 		if (ambiguousKeyPossible) {
-			int length = inputStream.getPosition() - beforeRead;
+			int length = inputView.getPosition() - beforeRead;
 			readVariableIntBytes(inputView, length);
 		}
 		return namespace;
@@ -70,17 +68,15 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 	public static <N> void writeNameSpace(
 		N namespace,
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDataOutputView,
+		ByteArrayDataOutputView keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 
-		int beforeWrite = keySerializationStream.getPosition();
+		int beforeWrite = keySerializationDataOutputView.getPosition();
 		namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write length of namespace
-			writeLengthFrom(beforeWrite, keySerializationStream,
-				keySerializationDataOutputView);
+			writeLengthFrom(beforeWrite, keySerializationDataOutputView);
 		}
 	}
 
@@ -100,17 +96,15 @@ public static void writeKeyGroup(
 	public static <K> void writeKey(
 		K key,
 		TypeSerializer<K> keySerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDataOutputView,
+		ByteArrayDataOutputView keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 		//write key
-		int beforeWrite = keySerializationStream.getPosition();
+		int beforeWrite = keySerializationDataOutputView.getPosition();
 		keySerializer.serialize(key, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write size of key
-			writeLengthFrom(beforeWrite, keySerializationStream,
-				keySerializationDataOutputView);
+			writeLengthFrom(beforeWrite, keySerializationDataOutputView);
 		}
 	}
 
@@ -123,9 +117,8 @@ private static void readVariableIntBytes(DataInputView inputView, int value) thr
 
 	private static void writeLengthFrom(
 		int fromPosition,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDateDataOutputView) throws IOException {
-		int length = keySerializationStream.getPosition() - fromPosition;
+		ByteArrayDataOutputView keySerializationDateDataOutputView) throws IOException {
+		int length = keySerializationDateDataOutputView.getPosition() - fromPosition;
 		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0fd11252b2f..c159976f293 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -34,6 +34,9 @@
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,8 +45,6 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -131,16 +132,12 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.Spliterator;
@@ -365,17 +362,16 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
 
 		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
-		final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
+		final ByteArrayDataOutputView namespaceOutputView = new ByteArrayDataOutputView(8);
 		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
 		try {
 			RocksDBKeySerializationUtils.writeNameSpace(
 				namespace,
 				namespaceSerializer,
-				namespaceOutputStream,
-				new DataOutputViewStreamWrapper(namespaceOutputStream),
+				namespaceOutputView,
 				ambiguousKeyPossible);
-			nameSpaceBytes = namespaceOutputStream.toByteArray();
+			nameSpaceBytes = namespaceOutputView.toByteArray();
 		} catch (IOException ex) {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
@@ -383,7 +379,7 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 		RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
 		iterator.seekToFirst();
 
-		final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
 			ambiguousKeyPossible, nameSpaceBytes);
 
 		Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
@@ -391,7 +387,7 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 	}
 
 	@VisibleForTesting
-	ColumnFamilyHandle getColumnFamilyHandle(String state) {
+	public ColumnFamilyHandle getColumnFamilyHandle(String state) {
 		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
 		return columnInfo != null ? columnInfo.f0 : null;
 	}
@@ -1451,385 +1447,6 @@ public int numKeyValueStateEntries() {
 		return count;
 	}
 
-	/**
-	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
-	 * The resulting iteration sequence is ordered by (key-group, kv-state).
-	 */
-	@VisibleForTesting
-	static class RocksDBMergeIterator implements AutoCloseable {
-
-		private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
-		private final int keyGroupPrefixByteCount;
-		private boolean newKeyGroup;
-		private boolean newKVState;
-		private boolean valid;
-
-		MergeIterator currentSubIterator;
-
-		private static final List<Comparator<MergeIterator>> COMPARATORS;
-
-		static {
-			int maxBytes = 2;
-			COMPARATORS = new ArrayList<>(maxBytes);
-			for (int i = 0; i < maxBytes; ++i) {
-				final int currentBytes = i + 1;
-				COMPARATORS.add((o1, o2) -> {
-					int arrayCmpRes = compareKeyGroupsForByteArrays(
-						o1.currentKey, o2.currentKey, currentBytes);
-					return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-				});
-			}
-		}
-
-		RocksDBMergeIterator(
-			List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
-			final int keyGroupPrefixByteCount) {
-			Preconditions.checkNotNull(kvStateIterators);
-			Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
-
-			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
-			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
-
-			if (kvStateIterators.size() > 0) {
-				PriorityQueue<MergeIterator> iteratorPriorityQueue =
-					new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
-				for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
-					final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
-					rocksIterator.seekToFirst();
-					if (rocksIterator.isValid()) {
-						iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
-					} else {
-						IOUtils.closeQuietly(rocksIterator);
-					}
-				}
-
-				kvStateIterators.clear();
-
-				this.heap = iteratorPriorityQueue;
-				this.valid = !heap.isEmpty();
-				this.currentSubIterator = heap.poll();
-			} else {
-				// creating a PriorityQueue of size 0 results in an exception.
-				this.heap = null;
-				this.valid = false;
-			}
-
-			this.newKeyGroup = true;
-			this.newKVState = true;
-		}
-
-		/**
-		 * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
-		 * calls to {@link #next()}.
-		 */
-		public void next() {
-			newKeyGroup = false;
-			newKVState = false;
-
-			final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
-			rocksIterator.next();
-
-			byte[] oldKey = currentSubIterator.getCurrentKey();
-			if (rocksIterator.isValid()) {
-
-				currentSubIterator.currentKey = rocksIterator.key();
-
-				if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
-					heap.offer(currentSubIterator);
-					currentSubIterator = heap.poll();
-					newKVState = currentSubIterator.getIterator() != rocksIterator;
-					detectNewKeyGroup(oldKey);
-				}
-			} else {
-				IOUtils.closeQuietly(rocksIterator);
-
-				if (heap.isEmpty()) {
-					currentSubIterator = null;
-					valid = false;
-				} else {
-					currentSubIterator = heap.poll();
-					newKVState = true;
-					detectNewKeyGroup(oldKey);
-				}
-			}
-		}
-
-		private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
-			return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
-		}
-
-		private void detectNewKeyGroup(byte[] oldKey) {
-			if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
-				newKeyGroup = true;
-			}
-		}
-
-		/**
-		 * @return key-group for the current key
-		 */
-		public int keyGroup() {
-			int result = 0;
-			//big endian decode
-			for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
-				result <<= 8;
-				result |= (currentSubIterator.currentKey[i] & 0xFF);
-			}
-			return result;
-		}
-
-		public byte[] key() {
-			return currentSubIterator.getCurrentKey();
-		}
-
-		public byte[] value() {
-			return currentSubIterator.getIterator().value();
-		}
-
-		/**
-		 * @return Id of K/V state to which the current key belongs.
-		 */
-		public int kvStateId() {
-			return currentSubIterator.getKvStateId();
-		}
-
-		/**
-		 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
-		 * @return true iff the current key belong to a different k/v-state than it's predecessor.
-		 */
-		public boolean isNewKeyValueState() {
-			return newKVState;
-		}
-
-		/**
-		 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
-		 * @return true iff the current key belong to a different key-group than it's predecessor.
-		 */
-		public boolean isNewKeyGroup() {
-			return newKeyGroup;
-		}
-
-		/**
-		 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
-		 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
-		 * {@link #next()} before accessing iterator state.
-		 * @return True iff this iterator is valid.
-		 */
-		public boolean isValid() {
-			return valid;
-		}
-
-		private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
-			for (int i = 0; i < len; ++i) {
-				int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
-				if (diff != 0) {
-					return diff;
-				}
-			}
-			return 0;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(currentSubIterator);
-			currentSubIterator = null;
-
-			IOUtils.closeAllQuietly(heap);
-			heap.clear();
-		}
-	}
-
-	/**
-	 * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
-	 * Used by #MergeIterator.
-	 */
-	@VisibleForTesting
-	protected static final class MergeIterator implements AutoCloseable {
-
-		/**
-		 * @param iterator  The #RocksIterator to wrap .
-		 * @param kvStateId Id of the K/V state to which this iterator belongs.
-		 */
-		MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.currentKey = iterator.key();
-			this.kvStateId = kvStateId;
-		}
-
-		private final RocksIteratorWrapper iterator;
-		private byte[] currentKey;
-		private final int kvStateId;
-
-		public byte[] getCurrentKey() {
-			return currentKey;
-		}
-
-		public void setCurrentKey(byte[] currentKey) {
-			this.currentKey = currentKey;
-		}
-
-		public RocksIteratorWrapper getIterator() {
-			return iterator;
-		}
-
-		public int getKvStateId() {
-			return kvStateId;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(iterator);
-		}
-	}
-
-	private static final class TransformingRocksIteratorWrapper extends RocksIteratorWrapper {
-		@Nonnull
-		private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
-		private byte[] current;
-
-		public TransformingRocksIteratorWrapper(
-			@Nonnull RocksIterator iterator,
-			@Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
-			super(iterator);
-			this.stateSnapshotTransformer = stateSnapshotTransformer;
-		}
-
-		@Override
-		public void seekToFirst() {
-			super.seekToFirst();
-			filterOrTransform(super::next);
-		}
-
-		@Override
-		public void seekToLast() {
-			super.seekToLast();
-			filterOrTransform(super::prev);
-		}
-
-		@Override
-		public void next() {
-			super.next();
-			filterOrTransform(super::next);
-		}
-
-		@Override
-		public void prev() {
-			super.prev();
-			filterOrTransform(super::prev);
-		}
-
-		private void filterOrTransform(Runnable advance) {
-			while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
-				advance.run();
-			}
-		}
-
-		@Override
-		public byte[] value() {
-			if (!isValid()) {
-				throw new IllegalStateException("value() method cannot be called if isValid() is false");
-			}
-			return current;
-		}
-	}
-
-	/**
-	 * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
-	 * is not thread safe.
-	 *
-	 * @param <K> the type of the iterated objects, which are keys in RocksDB.
-	 */
-	static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
-		private final RocksIteratorWrapper iterator;
-		private final String state;
-		private final TypeSerializer<K> keySerializer;
-		private final int keyGroupPrefixBytes;
-		private final byte[] namespaceBytes;
-		private final boolean ambiguousKeyPossible;
-		private K nextKey;
-		private K previousKey;
-
-		RocksIteratorForKeysWrapper(
-			RocksIteratorWrapper iterator,
-			String state,
-			TypeSerializer<K> keySerializer,
-			int keyGroupPrefixBytes,
-			boolean ambiguousKeyPossible,
-			byte[] namespaceBytes) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.state = Preconditions.checkNotNull(state);
-			this.keySerializer = Preconditions.checkNotNull(keySerializer);
-			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
-			this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
-			this.nextKey = null;
-			this.previousKey = null;
-			this.ambiguousKeyPossible = ambiguousKeyPossible;
-		}
-
-		@Override
-		public boolean hasNext() {
-			try {
-				while (nextKey == null && iterator.isValid()) {
-
-					byte[] key = iterator.key();
-
-					ByteArrayInputStreamWithPos inputStream =
-						new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
-
-					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
-
-					K value = RocksDBKeySerializationUtils.readKey(
-						keySerializer,
-						inputStream,
-						dataInput,
-						ambiguousKeyPossible);
-
-					int namespaceByteStartPos = inputStream.getPosition();
-
-					if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
-						previousKey = value;
-						nextKey = value;
-					}
-					iterator.next();
-				}
-			} catch (Exception e) {
-				throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
-			}
-			return nextKey != null;
-		}
-
-		@Override
-		public K next() {
-			if (!hasNext()) {
-				throw new NoSuchElementException("Failed to access state [" + state + "]");
-			}
-
-			K tmpKey = nextKey;
-			nextKey = null;
-			return tmpKey;
-		}
-
-		private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
-			final int namespaceBytesLength = namespaceBytes.length;
-			final int basicLength = namespaceBytesLength + beginPos;
-			if (key.length >= basicLength) {
-				for (int i = 0; i < namespaceBytesLength; ++i) {
-					if (key[beginPos + i] != namespaceBytes[i]) {
-						return false;
-					}
-				}
-				return true;
-			}
-			return false;
-		}
-
-		@Override
-		public void close() {
-			iterator.close();
-		}
-	}
-
 	private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
 
 		@Override
@@ -2214,8 +1831,8 @@ private void writeKVStateData() throws IOException, InterruptedException {
 				checkpointStreamWithResultProvider.getCheckpointOutputStream();
 
 			try {
-				// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
-				try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+				// Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
+				try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
 					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
 
 					// handover complete, null out to prevent double close
@@ -2729,7 +2346,7 @@ private static RocksIteratorWrapper getRocksIterator(
 		RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
 		return stateSnapshotTransformer == null ?
 			new RocksIteratorWrapper(rocksIterator) :
-			new TransformingRocksIteratorWrapper(rocksIterator, stateSnapshotTransformer);
+			new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 176f48cda98..cdd7afb7d9a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -25,9 +25,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -116,32 +115,31 @@ private RocksDBListState(
 	public List<V> getInternal() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
-			return deserializeList(valueBytes, elementSerializer);
+			return deserializeList(valueBytes);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
 
-	private static <V> List<V> deserializeList(
-		byte[] valueBytes, TypeSerializer<V> elementSerializer) {
+	private List<V> deserializeList(
+		byte[] valueBytes) {
 		if (valueBytes == null) {
 			return null;
 		}
 
-		DataInputViewStreamWrapper in = new ByteArrayDataInputView(valueBytes);
+		dataInputView.setData(valueBytes);
 
 		List<V> result = new ArrayList<>();
 		V next;
-		while ((next = deserializeNextElement(in, elementSerializer)) != null) {
+		while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
 			result.add(next);
 		}
 		return result;
 	}
 
-	private static <V> V deserializeNextElement(
-		DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
+	private static <V> V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
 		try {
 			if (in.available() > 0) {
 				V element = elementSerializer.deserialize(in);
@@ -162,11 +160,10 @@ public void add(V value) {
 
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			keySerializationStream.reset();
-			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
-			elementSerializer.serialize(value, out);
-			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+			byte[] key = dataOutputView.toByteArray();
+			dataOutputView.reset();
+			elementSerializer.serialize(value, dataOutputView);
+			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.toByteArray());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -184,19 +181,15 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 
 		try {
 			// create the target full-binary-key
-			writeKeyWithGroupAndNamespace(
-					keyGroup, key, target,
-					keySerializationStream, keySerializationDataOutputView);
-			final byte[] targetKey = keySerializationStream.toByteArray();
+			writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
+			final byte[] targetKey = dataOutputView.toByteArray();
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					byte[] sourceKey = keySerializationStream.toByteArray();
+					byte[] sourceKey = dataOutputView.toByteArray();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -225,14 +218,9 @@ public void updateInternal(List<V> values) {
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
-				if (premerge != null) {
-					backend.db.put(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in update()");
-				}
+				byte[] key = dataOutputView.toByteArray();
+				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+				backend.db.put(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -246,14 +234,9 @@ public void addAll(List<V> values) {
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
-				if (premerge != null) {
-					backend.db.merge(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in addAll()");
-				}
+				byte[] key = dataOutputView.toByteArray();
+				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+				backend.db.merge(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -263,8 +246,7 @@ public void addAll(List<V> values) {
 	private static <V> byte[] getPreMergedValue(
 		List<V> values,
 		TypeSerializer<V> elementSerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream) throws IOException {
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+		ByteArrayDataOutputView keySerializationStream) throws IOException {
 
 		keySerializationStream.reset();
 		boolean first = true;
@@ -275,7 +257,7 @@ public void addAll(List<V> values) {
 			} else {
 				keySerializationStream.write(DELIMITER);
 			}
-			elementSerializer.serialize(value, out);
+			elementSerializer.serialize(value, keySerializationStream);
 		}
 
 		return keySerializationStream.toByteArray();
@@ -298,7 +280,7 @@ public void addAll(List<V> values) {
 	static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
 		private final StateSnapshotTransformer<T> elementTransformer;
 		private final TypeSerializer<T> elementSerializer;
-		private final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(128);
+		private final ByteArrayDataOutputView out = new ByteArrayDataOutputView(128);
 		private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
 
 		StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b08eade53b9..ad6b7c22ec4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,10 +26,6 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
@@ -263,8 +259,7 @@ public void clear() {
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
-		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
+		ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128);
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -272,10 +267,9 @@ public void clear() {
 				safeKeySerializer,
 				keyAndNamespace.f1,
 				safeNamespaceSerializer,
-				outputStream,
 				outputView);
 
-		final byte[] keyPrefixBytes = outputStream.toByteArray();
+		final byte[] keyPrefixBytes = outputView.toByteArray();
 
 		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
@@ -309,14 +303,14 @@ public void clear() {
 	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
 		serializeCurrentKeyAndNamespace();
-		userKeySerializer.serialize(userKey, keySerializationDataOutputView);
+		userKeySerializer.serialize(userKey, dataOutputView);
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private byte[] serializeUserValue(UV userValue) throws IOException {
@@ -328,34 +322,29 @@ private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
 	}
 
 	private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
-		keySerializationStream.reset();
+		dataOutputView.reset();
 
 		if (userValue == null) {
-			keySerializationDataOutputView.writeBoolean(true);
+			dataOutputView.writeBoolean(true);
 		} else {
-			keySerializationDataOutputView.writeBoolean(false);
-			valueSerializer.serialize(userValue, keySerializationDataOutputView);
+			dataOutputView.writeBoolean(false);
+			valueSerializer.serialize(userValue, dataOutputView);
 		}
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
-		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
-		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
-
-		in.skipBytes(userKeyOffset);
-
-		return keySerializer.deserialize(in);
+		dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
+		return keySerializer.deserialize(dataInputView);
 	}
 
 	private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
-		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
-		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+		dataInputView.setData(rawValueBytes);
 
-		boolean isNull = in.readBoolean();
+		boolean isNull = dataInputView.readBoolean();
 
-		return isNull ? null : valueSerializer.deserialize(in);
+		return isNull ? null : valueSerializer.deserialize(dataInputView);
 	}
 
 	private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 490960e39be..d1fe3bd3798 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -25,15 +25,12 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.rocksdb.ColumnFamilyHandle;
 
-import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -87,7 +84,7 @@ private RocksDBReducingState(ColumnFamilyHandle columnFamily,
 	}
 
 	@Override
-	public V get() throws IOException {
+	public V get() {
 		return getInternal();
 	}
 
@@ -100,7 +97,7 @@ public void add(V value) throws Exception {
 	}
 
 	@Override
-	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+	public void mergeNamespaces(N target, Collection<N> sources) {
 		if (sources == null || sources.isEmpty()) {
 			return;
 		}
@@ -116,17 +113,15 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
 			for (N source : sources) {
 				if (source != null) {
 
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] sourceKey = dataOutputView.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						V value = valueSerializer.deserialize(
-								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+						dataInputView.setData(valueBytes);
+						V value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
 							current = reduceFunction.reduce(current, value);
@@ -141,27 +136,25 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(
-						keyGroup, key, target,
-						keySerializationStream, keySerializationDataOutputView);
+				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetKey = dataOutputView.toByteArray();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
+					dataInputView.setData(targetValueBytes);
 					// target also had a value, merge
-					V value = valueSerializer.deserialize(
-							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+					V value = valueSerializer.deserialize(dataInputView);
 
 					current = reduceFunction.reduce(current, value);
 				}
 
 				// serialize the resulting value
-				keySerializationStream.reset();
-				valueSerializer.serialize(current, keySerializationDataOutputView);
+				dataOutputView.reset();
+				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 5ae894e8177..e9399e12a32 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -23,8 +23,6 @@
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -32,7 +30,6 @@
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 /**
@@ -84,12 +81,13 @@ private RocksDBValueState(
 	public V value() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return getDefaultValue();
 			}
-			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+			dataInputView.setData(valueBytes);
+			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
 		}
@@ -101,13 +99,13 @@ public void update(V value) {
 			clear();
 			return;
 		}
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			keySerializationStream.reset();
-			valueSerializer.serialize(value, out);
-			backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+			byte[] key = dataOutputView.toByteArray();
+			dataOutputView.reset();
+			valueSerializer.serialize(value, dataOutputView);
+			backend.db.put(columnFamily, writeOptions, key, dataOutputView.toByteArray());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
new file mode 100644
index 00000000000..993b35ae2b6
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+	/**
+	 * @param iterator underlying {@link RocksIteratorWrapper}
+	 * @param kvStateId Id of the K/V state to which this iterator belongs.
+	 */
+	RocksSingleStateIterator(@Nonnull RocksIteratorWrapper iterator, int kvStateId) {
+		this.iterator = iterator;
+		this.currentKey = iterator.key();
+		this.kvStateId = kvStateId;
+	}
+
+	@Nonnull
+	private final RocksIteratorWrapper iterator;
+	private byte[] currentKey;
+	private final int kvStateId;
+
+	public byte[] getCurrentKey() {
+		return currentKey;
+	}
+
+	public void setCurrentKey(byte[] currentKey) {
+		this.currentKey = currentKey;
+	}
+
+	@Nonnull
+	public RocksIteratorWrapper getIterator() {
+		return iterator;
+	}
+
+	public int getKvStateId() {
+		return kvStateId;
+	}
+
+	@Override
+	public void close() {
+		IOUtils.closeQuietly(iterator);
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
new file mode 100644
index 00000000000..0fa93dc8a1f
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param <K> the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator<K> implements Iterator<K>, AutoCloseable {
+
+	@Nonnull
+	private final RocksIteratorWrapper iterator;
+
+	@Nonnull
+	private final String state;
+
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	@Nonnull
+	private final byte[] namespaceBytes;
+
+	private final boolean ambiguousKeyPossible;
+	private final int keyGroupPrefixBytes;
+	private final ByteArrayDataInputView byteArrayDataInputView;
+	private K nextKey;
+	private K previousKey;
+
+	public RocksStateKeysIterator(
+		@Nonnull RocksIteratorWrapper iterator,
+		@Nonnull String state,
+		@Nonnull TypeSerializer<K> keySerializer,
+		int keyGroupPrefixBytes,
+		boolean ambiguousKeyPossible,
+		@Nonnull byte[] namespaceBytes) {
+		this.iterator = iterator;
+		this.state = state;
+		this.keySerializer = keySerializer;
+		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+		this.namespaceBytes = namespaceBytes;
+		this.nextKey = null;
+		this.previousKey = null;
+		this.ambiguousKeyPossible = ambiguousKeyPossible;
+		this.byteArrayDataInputView = new ByteArrayDataInputView();
+	}
+
+	@Override
+	public boolean hasNext() {
+		try {
+			while (nextKey == null && iterator.isValid()) {
+
+				final byte[] keyBytes = iterator.key();
+				final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView);
+				final int namespaceByteStartPos = byteArrayDataInputView.getPosition();
+
+				if (isMatchingNameSpace(keyBytes, namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
+					previousKey = currentKey;
+					nextKey = currentKey;
+				}
+				iterator.next();
+			}
+		} catch (Exception e) {
+			throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
+		}
+		return nextKey != null;
+	}
+
+	@Override
+	public K next() {
+		if (!hasNext()) {
+			throw new NoSuchElementException("Failed to access state [" + state + "]");
+		}
+
+		K tmpKey = nextKey;
+		nextKey = null;
+		return tmpKey;
+	}
+
+	private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView readView) throws IOException {
+		readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
+		return RocksDBKeySerializationUtils.readKey(
+			keySerializer,
+			byteArrayDataInputView,
+			ambiguousKeyPossible);
+	}
+
+	private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
+		final int namespaceBytesLength = namespaceBytes.length;
+		final int basicLength = namespaceBytesLength + beginPos;
+		if (key.length >= basicLength) {
+			for (int i = 0; i < namespaceBytesLength; ++i) {
+				if (key[beginPos + i] != namespaceBytes[i]) {
+					return false;
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public void close() {
+		iterator.close();
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
new file mode 100644
index 00000000000..20e5dd03ce3
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+	private final PriorityQueue<RocksSingleStateIterator> heap;
+	private final int keyGroupPrefixByteCount;
+	private boolean newKeyGroup;
+	private boolean newKVState;
+	private boolean valid;
+	private RocksSingleStateIterator currentSubIterator;
+
+	private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS;
+
+	static {
+		int maxBytes = 2;
+		COMPARATORS = new ArrayList<>(maxBytes);
+		for (int i = 0; i < maxBytes; ++i) {
+			final int currentBytes = i + 1;
+			COMPARATORS.add((o1, o2) -> {
+				int arrayCmpRes = compareKeyGroupsForByteArrays(
+					o1.getCurrentKey(), o2.getCurrentKey(), currentBytes);
+				return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+			});
+		}
+	}
+
+	public RocksStatesPerKeyGroupMergeIterator(
+		List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+		final int keyGroupPrefixByteCount) {
+		Preconditions.checkNotNull(kvStateIterators);
+		Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+		this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+		if (kvStateIterators.size() > 0) {
+			this.heap = buildIteratorHeap(kvStateIterators);
+			this.valid = !heap.isEmpty();
+			this.currentSubIterator = heap.poll();
+			kvStateIterators.clear();
+		} else {
+			// creating a PriorityQueue of size 0 results in an exception.
+			this.heap = null;
+			this.valid = false;
+		}
+
+		this.newKeyGroup = true;
+		this.newKVState = true;
+	}
+
+	/**
+	 * Advances the iterator. Should only be called if {@link #isValid()} returned true.
+	 * Valid flag can only change after calling {@link #next()}.
+	 */
+	public void next() {
+		newKeyGroup = false;
+		newKVState = false;
+
+		final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
+		rocksIterator.next();
+
+		byte[] oldKey = currentSubIterator.getCurrentKey();
+		if (rocksIterator.isValid()) {
+
+			currentSubIterator.setCurrentKey(rocksIterator.key());
+
+			if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+				heap.offer(currentSubIterator);
+				currentSubIterator = heap.remove();
+				newKVState = currentSubIterator.getIterator() != rocksIterator;
+				detectNewKeyGroup(oldKey);
+			}
+		} else {
+			IOUtils.closeQuietly(rocksIterator);
+
+			if (heap.isEmpty()) {
+				currentSubIterator = null;
+				valid = false;
+			} else {
+				currentSubIterator = heap.remove();
+				newKVState = true;
+				detectNewKeyGroup(oldKey);
+			}
+		}
+	}
+
+	private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+		List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) {
+
+		Comparator<RocksSingleStateIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+		PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+			new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+		for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+			final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
+			rocksIterator.seekToFirst();
+			if (rocksIterator.isValid()) {
+				iteratorPriorityQueue.offer(
+					new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+			} else {
+				IOUtils.closeQuietly(rocksIterator);
+			}
+		}
+		return iteratorPriorityQueue;
+	}
+
+	private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+		return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
+	}
+
+	private void detectNewKeyGroup(byte[] oldKey) {
+		if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+			newKeyGroup = true;
+		}
+	}
+
+	/**
+	 * @return key-group for the current key
+	 */
+	public int keyGroup() {
+		final byte[] currentKey = currentSubIterator.getCurrentKey();
+		int result = 0;
+		//big endian decode
+		for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+			result <<= 8;
+			result |= (currentKey[i] & 0xFF);
+		}
+		return result;
+	}
+
+	public byte[] key() {
+		return currentSubIterator.getCurrentKey();
+	}
+
+	public byte[] value() {
+		return currentSubIterator.getIterator().value();
+	}
+
+	/**
+	 * @return Id of K/V state to which the current key belongs.
+	 */
+	public int kvStateId() {
+		return currentSubIterator.getKvStateId();
+	}
+
+	/**
+	 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
+	 * @return true iff the current key belong to a different k/v-state than it's predecessor.
+	 */
+	public boolean isNewKeyValueState() {
+		return newKVState;
+	}
+
+	/**
+	 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
+	 * @return true iff the current key belong to a different key-group than it's predecessor.
+	 */
+	public boolean isNewKeyGroup() {
+		return newKeyGroup;
+	}
+
+	/**
+	 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
+	 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
+	 * {@link #next()} before accessing iterator state.
+	 * @return True iff this iterator is valid.
+	 */
+	public boolean isValid() {
+		return valid;
+	}
+
+	private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
+		for (int i = 0; i < len; ++i) {
+			int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+			if (diff != 0) {
+				return diff;
+			}
+		}
+		return 0;
+	}
+
+	@Override
+	public void close() {
+		IOUtils.closeQuietly(currentSubIterator);
+		currentSubIterator = null;
+
+		IOUtils.closeAllQuietly(heap);
+		heap.clear();
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
new file mode 100644
index 00000000000..e2fec423e90
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIterator} that applies a given {@link StateSnapshotTransformer} to the elements
+ * during the iteration.
+ */
+public class RocksTransformingIteratorWrapper extends RocksIteratorWrapper {
+
+	@Nonnull
+	private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
+	private byte[] current;
+
+	public RocksTransformingIteratorWrapper(
+		@Nonnull RocksIterator iterator,
+		@Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
+		super(iterator);
+		this.stateSnapshotTransformer = stateSnapshotTransformer;
+	}
+
+	@Override
+	public void seekToFirst() {
+		super.seekToFirst();
+		filterOrTransform(super::next);
+	}
+
+	@Override
+	public void seekToLast() {
+		super.seekToLast();
+		filterOrTransform(super::prev);
+	}
+
+	@Override
+	public void next() {
+		super.next();
+		filterOrTransform(super::next);
+	}
+
+	@Override
+	public void prev() {
+		super.prev();
+		filterOrTransform(super::prev);
+	}
+
+	private void filterOrTransform(@Nonnull Runnable advance) {
+		while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
+			advance.run();
+		}
+	}
+
+	@Override
+	public byte[] value() {
+		if (!isValid()) {
+			throw new IllegalStateException("value() method cannot be called if isValid() is false");
+		}
+		return current;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4121cf08592..483b8fdd1dc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -18,9 +18,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.TestLogger;
@@ -114,35 +112,30 @@ private void testClipDBWithKeyGroupRangeHelper(
 			int currentGroupRangeStart = currentGroupRange.getStartKeyGroup();
 			int currentGroupRangeEnd = currentGroupRange.getEndKeyGroup();
 
+			ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(32);
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					rocksDB.put(columnFamilyHandle, outputStreamWithPos.toByteArray(), String.valueOf(j).getBytes());
+					rocksDB.put(columnFamilyHandle, outputView.toByteArray(), String.valueOf(j).getBytes());
 				}
 			}
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					byte[] value = rocksDB.get(columnFamilyHandle, outputStreamWithPos.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
 					Assert.assertEquals(String.valueOf(j), new String(value));
 				}
 			}
@@ -155,19 +148,15 @@ private void testClipDBWithKeyGroupRangeHelper(
 				keyGroupPrefixBytes);
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					byte[] value = rocksDB.get(
-						columnFamilyHandle, outputStreamWithPos.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
 					if (targetGroupRange.contains(i)) {
 						Assert.assertEquals(String.valueOf(j), new String(value));
 					} else {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
index b1737edcca9..d92bef5e960 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
@@ -19,6 +19,8 @@
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -61,39 +63,39 @@ public void testKeyGroupSerializationAndDeserialization() throws Exception {
 
 	@Test
 	public void testKeySerializationAndDeserialization() throws Exception {
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
-		DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
 
 		// test for key
 		for (int orgKey = 0; orgKey < 100; ++orgKey) {
-			outputStream.reset();
-			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, false);
-			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+			outputView.reset();
+			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, false);
+			inputView.setData(outputView.toByteArray());
+			int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgKey, deserializedKey);
 
-			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, true);
-			inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
+			inputView.setData(outputView.toByteArray());
+			deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgKey, deserializedKey);
 		}
 	}
 
 	@Test
 	public void testNamespaceSerializationAndDeserialization() throws Exception {
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
-		DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
 
 		for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
-			outputStream.reset();
-			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, false);
-			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+			outputView.reset();
+			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, false);
+			inputView.setData(outputView.toByteArray());
+			int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 
-			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, true);
-			inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, true);
+			inputView.setData(outputView.toByteArray());
+			deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 		}
 	}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index f56099827ac..e042ebd0609 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -23,8 +23,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -47,7 +47,7 @@
 /**
  * Tests for the RocksIteratorWrapper.
  */
-public class RocksDBRocksIteratorForKeysWrapperTest {
+public class RocksDBRocksStateKeysIteratorTest {
 
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
@@ -105,13 +105,12 @@ public void testIterator() throws Exception{
 				testState.update(String.valueOf(i));
 			}
 
-			ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
+			ByteArrayDataOutputView outputStream = new ByteArrayDataOutputView(8);
 			boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 			RocksDBKeySerializationUtils.writeNameSpace(
 				namespace,
 				namespaceSerializer,
 				outputStream,
-				new DataOutputViewStreamWrapper(outputStream),
 				ambiguousKeyPossible);
 
 			byte[] nameSpaceBytes = outputStream.toByteArray();
@@ -119,8 +118,8 @@ public void testIterator() throws Exception{
 			try (
 				ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 				RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
-				RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
-					new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+				RocksStateKeysIterator<K> iteratorWrapper =
+					new RocksStateKeysIterator<>(
 						iterator,
 						testStateName,
 						keySerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
similarity index 91%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
rename to flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index cb2b202bc63..e1240a8bfad 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.util.IOUtils;
 
@@ -39,9 +40,9 @@
 import java.util.Random;
 
 /**
- * Tests for the RocksDBMergeIterator.
+ * Tests for the RocksStatesPerKeyGroupMergeIterator.
  */
-public class RocksDBMergeIteratorTest {
+public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 	private static final int NUM_KEY_VAL_STATES = 50;
 	private static final int MAX_NUM_KEYS = 20;
@@ -51,8 +52,8 @@
 
 	@Test
 	public void testEmptyMergeIterator() throws Exception {
-		RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
-				new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
+		RocksStatesPerKeyGroupMergeIterator emptyIterator =
+				new RocksStatesPerKeyGroupMergeIterator(Collections.emptyList(), 2);
 		Assert.assertFalse(emptyIterator.isValid());
 	}
 
@@ -111,7 +112,7 @@ public void testMergeIterator(int maxParallelism) throws Exception {
 				++id;
 			}
 
-			try (RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
+			try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
 				rocksIteratorsWithKVStateId,
 				maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services