You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/07 12:01:05 UTC

[flink] 03/03: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54ef382439b1687a645a377a5f6a095746423109
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Nov 27 13:51:29 2018 +0800

    [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
    
    This closes #7288.
    
    Co-authored-by: Stefan Richter <s....@data-artisans.com>
    Co-authored-by: klion26 <qc...@gmail.com>
---
 .../core/memory/ByteArrayOutputStreamWithPos.java  |   3 +-
 .../flink/core/memory/DataOutputSerializer.java    |   7 +
 .../streaming/state/AbstractRocksDBState.java      | 140 ++++++------
 .../streaming/state/RocksDBAggregatingState.java   |  14 +-
 .../state/RocksDBKeySerializationUtils.java        |  12 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  14 ++
 .../contrib/streaming/state/RocksDBListState.java  |  90 ++++----
 .../contrib/streaming/state/RocksDBMapState.java   |  83 +++----
 .../streaming/state/RocksDBReducingState.java      |  15 +-
 .../RocksDBSerializedCompositeKeyBuilder.java      | 206 +++++++++++++++++
 .../contrib/streaming/state/RocksDBValueState.java |  14 +-
 .../RocksDBSerializedCompositeKeyBuilderTest.java  | 250 +++++++++++++++++++++
 12 files changed, 643 insertions(+), 205 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5..2f4fdfe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 
@@ -112,7 +111,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
 	}
 
 	@Override
-	public void close() throws IOException {
+	public void close() {
 	}
 
 	public byte[] getBuf() {
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae0..b749c84 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.core.memory;
 
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -350,6 +352,11 @@ public class DataOutputSerializer implements DataOutputView {
 		this.position += numBytes;
 	}
 
+	public void setPosition(int position) {
+		Preconditions.checkArgument(position >= 0 && position <= this.position, "Position out of bounds.");
+		this.position = position;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index b5ab996..fb19a8a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,6 +34,7 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Base class for {@link State} implementations that store state in a RocksDB database.
@@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 
 	protected final DataInputDeserializer dataInputView;
 
-	private final boolean ambiguousKeyPossible;
+	private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
 
 	/**
 	 * Creates a new RocksDB backed state.
@@ -99,8 +100,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 
 		this.dataOutputView = new DataOutputSerializer(128);
 		this.dataInputView = new DataInputDeserializer();
-		this.ambiguousKeyPossible =
-			RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+		this.sharedKeyNamespaceSerializer = backend.getSharedRocksKeyBuilder();
 	}
 
 	// ------------------------------------------------------------------------
@@ -108,17 +108,15 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 	@Override
 	public void clear() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			backend.db.delete(columnFamily, writeOptions, key);
-		} catch (IOException | RocksDBException e) {
+			backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace());
+		} catch (RocksDBException e) {
 			throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
 		}
 	}
 
 	@Override
 	public void setCurrentNamespace(N namespace) {
-		this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
+		this.currentNamespace = namespace;
 	}
 
 	@Override
@@ -128,30 +126,78 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 			final TypeSerializer<N> safeNamespaceSerializer,
 			final TypeSerializer<V> safeValueSerializer) throws Exception {
 
-		Preconditions.checkNotNull(serializedKeyAndNamespace);
-		Preconditions.checkNotNull(safeKeySerializer);
-		Preconditions.checkNotNull(safeNamespaceSerializer);
-		Preconditions.checkNotNull(safeValueSerializer);
-
 		//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
 		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
 				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		// we cannot reuse the keySerializationStream member since this method
-		// is called concurrently to the other ones and it may thus contain garbage
-		DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128);
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+						new RocksDBSerializedCompositeKeyBuilder<>(
+							safeKeySerializer,
+							backend.getKeyGroupPrefixBytes(),
+							32
+						);
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+		byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+		return backend.db.get(columnFamily, key);
+	}
+
+	<UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+		UK userKey,
+		TypeSerializer<UK> userKeySerializer) throws IOException {
+		return sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+			currentNamespace,
+			namespaceSerializer,
+			userKey,
+			userKeySerializer
+		);
+	}
+
+	private <T> byte[] serializeValueInternal(T value, TypeSerializer<T> serializer) throws IOException {
+		serializer.serialize(value, dataOutputView);
+		return dataOutputView.getCopyOfBuffer();
+	}
+
+	byte[] serializeCurrentKeyWithGroupAndNamespace() {
+		return sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, namespaceSerializer);
+	}
+
+	byte[] serializeValue(V value) throws IOException {
+		return serializeValue(value, valueSerializer);
+	}
+
+	<T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> serializer) throws IOException {
+		dataOutputView.clear();
+		dataOutputView.writeBoolean(value == null);
+		return serializeValueInternal(value, serializer);
+	}
+
+	<T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
+		dataOutputView.clear();
+		return serializeValueInternal(value, serializer);
+	}
+
+	<T> byte[] serializeValueList(
+		List<T> valueList,
+		TypeSerializer<T> elementSerializer,
+		byte delimiter) throws IOException {
+
+		dataOutputView.clear();
+		boolean first = true;
 
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				keyAndNamespace.f0,
-				safeKeySerializer,
-				keyAndNamespace.f1,
-				safeNamespaceSerializer,
-				tmpKeySerializationView);
+		for (T value : valueList) {
+			Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+			if (first) {
+				first = false;
+			} else {
+				dataOutputView.write(delimiter);
+			}
+			elementSerializer.serialize(value, dataOutputView);
+		}
 
-		return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
+		return dataOutputView.getCopyOfBuffer();
 	}
 
 	public void migrateSerializedValue(
@@ -169,12 +215,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 	}
 
 	byte[] getKeyBytes() {
-		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			return dataOutputView.getCopyOfBuffer();
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Error while serializing key", e);
-		}
+		return serializeCurrentKeyWithGroupAndNamespace();
 	}
 
 	byte[] getValueBytes(V value) {
@@ -187,45 +228,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 		}
 	}
 
-	protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
-		writeKeyWithGroupAndNamespace(
-			backend.getCurrentKeyGroupIndex(),
-			backend.getCurrentKey(),
-			currentNamespace,
-			dataOutputView);
-	}
-
-	protected void writeKeyWithGroupAndNamespace(
-			int keyGroup, K key, N namespace,
-			DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				key,
-				backend.getKeySerializer(),
-				namespace,
-				namespaceSerializer,
-				keySerializationDataOutputView);
-	}
-
-	protected void writeKeyWithGroupAndNamespace(
-			final int keyGroup,
-			final K key,
-			final TypeSerializer<K> keySerializer,
-			final N namespace,
-			final TypeSerializer<N> namespaceSerializer,
-			final DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
-		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
-		Preconditions.checkNotNull(keySerializer);
-		Preconditions.checkNotNull(namespaceSerializer);
-
-		keySerializationDataOutputView.clear();
-		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
-		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
-	}
-
 	protected V getDefaultValue() {
 		if (defaultValue != null) {
 			return valueSerializer.copy(defaultValue);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 770c558..fbd2979 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -109,19 +109,14 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			ACC current = null;
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
-					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -141,10 +136,9 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
+				setCurrentNamespace(target);
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
-				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+				final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bf..02b4ffc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 /**
@@ -80,8 +82,12 @@ public class RocksDBKeySerializationUtils {
 		}
 	}
 
+	public static boolean isSerializerTypeVariableSized(@Nonnull TypeSerializer<?> serializer) {
+		return serializer.getLength() < 0;
+	}
+
 	public static boolean isAmbiguousKeyPossible(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) {
-		return (keySerializer.getLength() < 0) && (namespaceSerializer.getLength() < 0);
+		return (isSerializerTypeVariableSized(keySerializer) && isSerializerTypeVariableSized(namespaceSerializer));
 	}
 
 	public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public class RocksDBKeySerializationUtils {
 		}
 	}
 
-	private static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
+	public static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
 		do {
 			inputView.readByte();
 			value >>>= 8;
@@ -122,7 +128,7 @@ public class RocksDBKeySerializationUtils {
 		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
-	private static void writeVariableIntBytes(
+	public static void writeVariableIntBytes(
 		int value,
 		DataOutputView keySerializationDateDataOutputView)
 		throws IOException {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72445ca..6b00393 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,6 +242,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The native metrics monitor. */
 	private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
+	/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
+	private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
 		ClassLoader userCodeClassLoader,
@@ -294,6 +297,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
+		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -373,6 +377,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
+	@Override
+	public void setCurrentKey(K newKey) {
+		super.setCurrentKey(newKey);
+		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
+	}
+
 	/**
 	 * Should only be called by one thread, and only after all accesses to the DB happened.
 	 */
@@ -456,6 +466,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
+	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+		return sharedRocksKeyBuilder;
+	}
+
 	/**
 	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
 	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index cedeed8..13f5559 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -115,11 +115,10 @@ class RocksDBListState<K, N, V>
 	@Override
 	public List<V> getInternal() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
+			byte[] key = serializeCurrentKeyWithGroupAndNamespace();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			return deserializeList(valueBytes);
-		} catch (IOException | RocksDBException e) {
+		} catch (RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
@@ -160,11 +159,12 @@ class RocksDBListState<K, N, V>
 		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
 
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			dataOutputView.clear();
-			elementSerializer.serialize(value, dataOutputView);
-			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+			backend.db.merge(
+				columnFamily,
+				writeOptions,
+				serializeCurrentKeyWithGroupAndNamespace(),
+				serializeValue(value, elementSerializer)
+			);
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -176,21 +176,17 @@ class RocksDBListState<K, N, V>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			// create the target full-binary-key
-			writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-			final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+			setCurrentNamespace(target);
+			final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 
-					byte[] sourceKey = dataOutputView.getCopyOfBuffer();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -218,10 +214,11 @@ class RocksDBListState<K, N, V>
 
 		if (!values.isEmpty()) {
 			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.getCopyOfBuffer();
-				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
-				backend.db.put(columnFamily, writeOptions, key, premerge);
+				backend.db.put(
+					columnFamily,
+					writeOptions,
+					serializeCurrentKeyWithGroupAndNamespace(),
+					serializeValueList(values, elementSerializer, DELIMITER));
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -234,10 +231,11 @@ class RocksDBListState<K, N, V>
 
 		if (!values.isEmpty()) {
 			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.getCopyOfBuffer();
-				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
-				backend.db.merge(columnFamily, writeOptions, key, premerge);
+				backend.db.merge(
+					columnFamily,
+					writeOptions,
+					serializeCurrentKeyWithGroupAndNamespace(),
+					serializeValueList(values, elementSerializer, DELIMITER));
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -273,26 +271,6 @@ class RocksDBListState<K, N, V>
 		}
 	}
 
-	private static <V> byte[] getPreMergedValue(
-		List<V> values,
-		TypeSerializer<V> elementSerializer,
-		DataOutputSerializer keySerializationStream) throws IOException {
-
-		keySerializationStream.clear();
-		boolean first = true;
-		for (V value : values) {
-			Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
-			if (first) {
-				first = false;
-			} else {
-				keySerializationStream.write(DELIMITER);
-			}
-			elementSerializer.serialize(value, keySerializationStream);
-		}
-
-		return keySerializationStream.getCopyOfBuffer();
-	}
-
 	@SuppressWarnings("unchecked")
 	static <E, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ class RocksDBListState<K, N, V>
 				prevPosition = in.getPosition();
 			}
 			try {
-				return result.isEmpty() ? null : getPreMergedValue(result, elementSerializer, out);
+				return result.isEmpty() ? null : serializeValueList(result, elementSerializer, DELIMITER);
 			} catch (IOException e) {
 				throw new FlinkRuntimeException("Failed to serialize transformed list", e);
 			}
 		}
+
+		byte[] serializeValueList(
+			List<T> valueList,
+			TypeSerializer<T> elementSerializer,
+			byte delimiter) throws IOException {
+
+			out.clear();
+			boolean first = true;
+
+			for (T value : valueList) {
+				Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+				if (first) {
+					first = false;
+				} else {
+					out.write(delimiter);
+				}
+				elementSerializer.serialize(value, out);
+			}
+
+			return out.getCopyOfBuffer();
+		}
 	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 7abbe3d..13cbded 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -119,7 +119,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	public UV get(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 		byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
 
 		return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
 
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-		byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+		byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
 
 		backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 	}
@@ -142,8 +142,8 @@ class RocksDBMapState<K, N, UK, UV>
 
 		try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
 			for (Map.Entry<UK, UV> entry : map.entrySet()) {
-				byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
-				byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+				byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(), userKeySerializer);
+				byte[] rawValueBytes = serializeValueNullSensitive(entry.getValue(), userValueSerializer);
 				writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
 			}
 		}
@@ -151,21 +151,21 @@ class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	public void remove(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 
 		backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
 	}
 
 	@Override
 	public boolean contains(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 		byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
 
 		return (rawValueBytes != null);
 	}
 
 	@Override
-	public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+	public Iterable<Map.Entry<UK, UV>> entries() {
 		final Iterator<Map.Entry<UK, UV>> iterator = iterator();
 
 		// Return null to make the behavior consistent with other states.
@@ -177,10 +177,11 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterable<UK> keys() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterable<UK> keys() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+			@Nullable
 			@Override
 			public UK next() {
 				RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterable<UV> values() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterable<UV> values() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
@@ -203,8 +204,8 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterator<Map.Entry<UK, UV>> iterator() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
@@ -220,7 +221,7 @@ class RocksDBMapState<K, N, UK, UV>
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
 				WriteBatch writeBatch = new WriteBatch(128)) {
 
-				final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
+				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 				iterator.seek(keyPrefixBytes);
 
 				while (iterator.isValid()) {
@@ -259,18 +260,15 @@ class RocksDBMapState<K, N, UK, UV>
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		DataOutputSerializer outputView = new DataOutputSerializer(128);
-		DataInputDeserializer inputView = new DataInputDeserializer();
-
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				keyAndNamespace.f0,
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			new RocksDBSerializedCompositeKeyBuilder<>(
 				safeKeySerializer,
-				keyAndNamespace.f1,
-				safeNamespaceSerializer,
-				outputView);
+				backend.getKeyGroupPrefixBytes(),
+				32);
+
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
 
-		final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+		final byte[] keyPrefixBytes = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 
 		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
@@ -282,7 +280,8 @@ class RocksDBMapState<K, N, UK, UV>
 				keyPrefixBytes,
 				dupUserKeySerializer,
 				dupUserValueSerializer,
-				inputView) {
+				dataInputView
+			) {
 
 			@Override
 			public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ class RocksDBMapState<K, N, UK, UV>
 	//  Serialization Methods
 	// ------------------------------------------------------------------------
 
-	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
-		writeCurrentKeyWithGroupAndNamespace();
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
-	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
-		serializeCurrentKeyAndNamespace();
-		userKeySerializer.serialize(userKey, dataOutputView);
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
-	private static <UV> byte[] serializeUserValue(
-		UV userValue,
-		TypeSerializer<UV> valueSerializer,
-		DataOutputSerializer dataOutputView) throws IOException {
-
-		dataOutputView.clear();
-
-		if (userValue == null) {
-			dataOutputView.writeBoolean(true);
-		} else {
-			dataOutputView.writeBoolean(false);
-			valueSerializer.serialize(userValue, dataOutputView);
-		}
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
 	private static <UK> UK deserializeUserKey(
 		DataInputDeserializer dataInputView,
 		int userKeyOffset,
@@ -474,7 +443,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			try {
 				userValue = value;
-				rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView);
+				rawValueBytes = serializeValueNullSensitive(value, valueSerializer);
 
 				db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 			} catch (IOException | RocksDBException e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 69736ae..a5eb46e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -102,20 +102,14 @@ class RocksDBReducingState<K, N, V>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			V current = null;
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
-					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -136,9 +130,8 @@ class RocksDBReducingState<K, N, V>
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
-				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+				setCurrentNamespace(target);
+				final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 0000000..8e83e29
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder<K> {
+
+	/** The serializer for the key. */
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	/** The output to write the key into. */
+	@Nonnull
+	private final DataOutputSerializer keyOutView;
+
+	/** The number of Key-group-prefix bytes for the key. */
+	@Nonnegative
+	private final int keyGroupPrefixBytes;
+
+	/** This flag indicates whether the key type has a variable byte size in serialization. */
+	private final boolean keySerializerTypeVariableSized;
+
+	/** Mark for the position after the serialized key. */
+	@Nonnegative
+	private int afterKeyMark;
+
+	public RocksDBSerializedCompositeKeyBuilder(
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnegative int initialSize) {
+		this(
+			keySerializer,
+			new DataOutputSerializer(initialSize),
+			keyGroupPrefixBytes,
+			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+			0);
+	}
+
+	@VisibleForTesting
+	RocksDBSerializedCompositeKeyBuilder(
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull DataOutputSerializer keyOutView,
+		@Nonnegative int keyGroupPrefixBytes,
+		boolean keySerializerTypeVariableSized,
+		@Nonnegative int afterKeyMark) {
+		this.keySerializer = keySerializer;
+		this.keyOutView = keyOutView;
+		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+		this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
+		this.afterKeyMark = afterKeyMark;
+	}
+
+	/**
+	 * Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create
+	 * composite keys with provided namespaces.
+	 *
+	 * @param key        the key.
+	 * @param keyGroupId the key-group id for the key.
+	 */
+	public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
+		try {
+			serializeKeyGroupAndKey(key, keyGroupId);
+		} catch (IOException shouldNeverHappen) {
+			throw new FlinkRuntimeException(shouldNeverHappen);
+		}
+	}
+
+	/**
+	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
+	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+	 *
+	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
+	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+	 * @param <N>                 the type of the namespace.
+	 * @return the bytes for the serialized composite key of key-group, key, namespace.
+	 */
+	@Nonnull
+	public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
+		try {
+			serializeNamespace(namespace, namespaceSerializer);
+			final byte[] result = keyOutView.getCopyOfBuffer();
+			resetToKey();
+			return result;
+		} catch (IOException shouldNeverHappen) {
+			throw new FlinkRuntimeException(shouldNeverHappen);
+		}
+	}
+
+	/**
+	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
+	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given user-key.
+	 *
+	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
+	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+	 * @param userKey             the user-key to concatenate for the serialized composite key, after the namespace.
+	 * @param userKeySerializer   the serializer to obtain the serialized form of the user-key.
+	 * @param <N>                 the type of the namespace.
+	 * @param <UK>                the type of the user-key.
+	 * @return the bytes for the serialized composite key of key-group, key, namespace.
+	 */
+	@Nonnull
+	public <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+		@Nonnull N namespace,
+		@Nonnull TypeSerializer<N> namespaceSerializer,
+		@Nonnull UK userKey,
+		@Nonnull TypeSerializer<UK> userKeySerializer) throws IOException {
+		serializeNamespace(namespace, namespaceSerializer);
+		userKeySerializer.serialize(userKey, keyOutView);
+		byte[] result = keyOutView.getCopyOfBuffer();
+		resetToKey();
+		return result;
+	}
+
+	private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
+
+		// clear buffer and mark
+		resetFully();
+
+		// write key-group
+		RocksDBKeySerializationUtils.writeKeyGroup(
+			keyGroupId,
+			keyGroupPrefixBytes,
+			keyOutView);
+		// write key
+		keySerializer.serialize(key, keyOutView);
+		afterKeyMark = keyOutView.length();
+	}
+
+	private <N> void serializeNamespace(
+		@Nonnull N namespace,
+		@Nonnull TypeSerializer<N> namespaceSerializer) throws IOException {
+
+		// this should only be called when there is already a key written so that we build the composite.
+		assert isKeyWritten();
+
+		final boolean ambiguousCompositeKeyPossible = isAmbiguousCompositeKeyPossible(namespaceSerializer);
+		if (ambiguousCompositeKeyPossible) {
+			RocksDBKeySerializationUtils.writeVariableIntBytes(
+				afterKeyMark - keyGroupPrefixBytes,
+				keyOutView);
+		}
+		RocksDBKeySerializationUtils.writeNameSpace(
+			namespace,
+			namespaceSerializer,
+			keyOutView,
+			ambiguousCompositeKeyPossible);
+	}
+
+	private void resetFully() {
+		afterKeyMark = 0;
+		keyOutView.clear();
+	}
+
+	private void resetToKey() {
+		keyOutView.setPosition(afterKeyMark);
+	}
+
+	private boolean isKeyWritten() {
+		return afterKeyMark > 0;
+	}
+
+	@VisibleForTesting
+	boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
+		return keySerializerTypeVariableSized &
+			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+	}
+
+	@VisibleForTesting
+	boolean isKeySerializerTypeVariableSized() {
+		return keySerializerTypeVariableSized;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 97b83df..a8f5163 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -80,9 +80,9 @@ class RocksDBValueState<K, N, V>
 	@Override
 	public V value() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			byte[] valueBytes = backend.db.get(columnFamily, key);
+			byte[] valueBytes = backend.db.get(columnFamily,
+				serializeCurrentKeyWithGroupAndNamespace());
+
 			if (valueBytes == null) {
 				return getDefaultValue();
 			}
@@ -101,11 +101,9 @@ class RocksDBValueState<K, N, V>
 		}
 
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			dataOutputView.clear();
-			valueSerializer.serialize(value, dataOutputView);
-			backend.db.put(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+			backend.db.put(columnFamily, writeOptions,
+				serializeCurrentKeyWithGroupAndNamespace(),
+				serializeValue(value));
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
new file mode 100644
index 0000000..70a4c3c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+	private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+
+	private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+	private static final Collection<Integer> TEST_INTS = Arrays.asList(42, 4711);
+	private static final Collection<String> TEST_STRINGS = Arrays.asList("test123", "abc");
+
+	@Before
+	public void before() {
+		dataOutputSerializer.clear();
+	}
+
+	@Test
+	public void testSetKey() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, parallelism);
+			testSetKeyInternal(StringSerializer.INSTANCE, TEST_STRINGS, parallelism);
+		}
+	}
+
+	@Test
+	public void testSetKeyNamespace() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyNamespaceInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+		}
+	}
+
+	@Test
+	public void testSetKeyNamespaceUserKey() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_STRINGS, parallelism);
+		}
+	}
+
+	private <K> void testSetKeyInternal(TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			byte[] result = dataOutputSerializer.getCopyOfBuffer();
+			deserializer.setBuffer(result);
+			assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, serializer, deserializer, false);
+			Assert.assertEquals(0, deserializer.available());
+		}
+	}
+
+	private <K, N> void testSetKeyNamespaceInternal(
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		Collection<K> testKeys,
+		Collection<N> testNamespaces,
+		int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			for (N testNamespace : testNamespaces) {
+				byte[] compositeBytes = keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+				deserializer.setBuffer(compositeBytes);
+				assertKeyGroupKeyNamespaceBytes(
+					testKey,
+					keyGroup,
+					prefixBytes,
+					keySerializer,
+					testNamespace,
+					namespaceSerializer,
+					deserializer,
+					ambiguousPossible);
+				Assert.assertEquals(0, deserializer.available());
+			}
+		}
+	}
+
+	private <K, N, U> void testSetKeyNamespaceUserKeyInternal(
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<U> userKeySerializer,
+		Collection<K> testKeys,
+		Collection<N> testNamespaces,
+		Collection<U> testUserKeys,
+		int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			for (N testNamespace : testNamespaces) {
+				for (U testUserKey : testUserKeys) {
+					byte[] compositeBytes = keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+						testNamespace,
+						namespaceSerializer,
+						testUserKey,
+						userKeySerializer);
+
+					deserializer.setBuffer(compositeBytes);
+					assertKeyGroupKeyNamespaceUserKeyBytes(
+						testKey,
+						keyGroup,
+						prefixBytes,
+						keySerializer,
+						testNamespace,
+						namespaceSerializer,
+						testUserKey,
+						userKeySerializer,
+						deserializer,
+						ambiguousPossible);
+
+					Assert.assertEquals(0, deserializer.available());
+				}
+			}
+		}
+	}
+
+	private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
+		TypeSerializer<K> serializer,
+		int prefixBytes) {
+		final boolean variableSize = RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+		return new RocksDBSerializedCompositeKeyBuilder<>(
+			serializer,
+			dataOutputSerializer,
+			prefixBytes,
+			variableSize,
+			0);
+	}
+
+	private <K> int setKeyAndReturnKeyGroup(
+		RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
+		K key,
+		int maxParallelism) {
+
+		int keyGroup = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, maxParallelism);
+		compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+		return keyGroup;
+	}
+
+	private <K> void assertKeyKeyGroupBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> typeSerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+
+		Assert.assertEquals(keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+		Assert.assertEquals(key, RocksDBKeySerializationUtils.readKey(typeSerializer, deserializer, ambiguousCompositeKeyPossible));
+	}
+
+	private <K, N> void assertKeyGroupKeyNamespaceBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> keySerializer,
+		N namespace,
+		TypeSerializer<N> namespaceSerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+		assertKeyKeyGroupBytes(key, keyGroup, prefixBytes, keySerializer, deserializer, ambiguousCompositeKeyPossible);
+		N readNamespace =
+			RocksDBKeySerializationUtils.readNamespace(namespaceSerializer, deserializer, ambiguousCompositeKeyPossible);
+		Assert.assertEquals(namespace, readNamespace);
+	}
+
+	private <K, N, U> void assertKeyGroupKeyNamespaceUserKeyBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> keySerializer,
+		N namespace,
+		TypeSerializer<N> namespaceSerializer,
+		U userKey,
+		TypeSerializer<U> userKeySerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+		assertKeyGroupKeyNamespaceBytes(
+			key,
+			keyGroup,
+			prefixBytes,
+			keySerializer,
+			namespace,
+			namespaceSerializer,
+			deserializer,
+			ambiguousCompositeKeyPossible);
+		Assert.assertEquals(userKey, userKeySerializer.deserialize(deserializer));
+	}
+}