You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/07 12:01:02 UTC

[flink] branch master updated (c0a37f8 -> 54ef382)

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c0a37f8  [FLINK-10845] [table] Support multiple different DISTINCT aggregates for batch
     new 28e0b83  [hotfix] Improve performance of GenericArraySerializer.copy()
     new 6f9a884  [hotfix] Remove unused generic parameter from RocksDB states
     new 54ef382  [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../typeutils/base/GenericArraySerializer.java     |  20 +-
 .../core/memory/ByteArrayOutputStreamWithPos.java  |   3 +-
 .../flink/core/memory/DataOutputSerializer.java    |   7 +
 .../state/AbstractRocksDBAppendingState.java       |   5 +-
 .../streaming/state/AbstractRocksDBState.java      | 143 ++++++------
 .../streaming/state/RocksDBAggregatingState.java   |  16 +-
 .../streaming/state/RocksDBFoldingState.java       |   2 +-
 .../state/RocksDBKeySerializationUtils.java        |  12 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  16 +-
 .../contrib/streaming/state/RocksDBListState.java  |  92 ++++----
 .../contrib/streaming/state/RocksDBMapState.java   |  85 +++----
 .../streaming/state/RocksDBReducingState.java      |  17 +-
 .../RocksDBSerializedCompositeKeyBuilder.java      | 206 +++++++++++++++++
 .../contrib/streaming/state/RocksDBValueState.java |  16 +-
 .../RocksDBSerializedCompositeKeyBuilderTest.java  | 250 +++++++++++++++++++++
 15 files changed, 666 insertions(+), 224 deletions(-)
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java


[flink] 03/03: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54ef382439b1687a645a377a5f6a095746423109
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Nov 27 13:51:29 2018 +0800

    [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
    
    This closes #7288.
    
    Co-authored-by: Stefan Richter <s....@data-artisans.com>
    Co-authored-by: klion26 <qc...@gmail.com>
---
 .../core/memory/ByteArrayOutputStreamWithPos.java  |   3 +-
 .../flink/core/memory/DataOutputSerializer.java    |   7 +
 .../streaming/state/AbstractRocksDBState.java      | 140 ++++++------
 .../streaming/state/RocksDBAggregatingState.java   |  14 +-
 .../state/RocksDBKeySerializationUtils.java        |  12 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  14 ++
 .../contrib/streaming/state/RocksDBListState.java  |  90 ++++----
 .../contrib/streaming/state/RocksDBMapState.java   |  83 +++----
 .../streaming/state/RocksDBReducingState.java      |  15 +-
 .../RocksDBSerializedCompositeKeyBuilder.java      | 206 +++++++++++++++++
 .../contrib/streaming/state/RocksDBValueState.java |  14 +-
 .../RocksDBSerializedCompositeKeyBuilderTest.java  | 250 +++++++++++++++++++++
 12 files changed, 643 insertions(+), 205 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5..2f4fdfe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 
@@ -112,7 +111,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
 	}
 
 	@Override
-	public void close() throws IOException {
+	public void close() {
 	}
 
 	public byte[] getBuf() {
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae0..b749c84 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.core.memory;
 
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -350,6 +352,11 @@ public class DataOutputSerializer implements DataOutputView {
 		this.position += numBytes;
 	}
 
+	public void setPosition(int position) {
+		Preconditions.checkArgument(position >= 0 && position <= this.position, "Position out of bounds.");
+		this.position = position;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index b5ab996..fb19a8a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,6 +34,7 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Base class for {@link State} implementations that store state in a RocksDB database.
@@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 
 	protected final DataInputDeserializer dataInputView;
 
-	private final boolean ambiguousKeyPossible;
+	private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
 
 	/**
 	 * Creates a new RocksDB backed state.
@@ -99,8 +100,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 
 		this.dataOutputView = new DataOutputSerializer(128);
 		this.dataInputView = new DataInputDeserializer();
-		this.ambiguousKeyPossible =
-			RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+		this.sharedKeyNamespaceSerializer = backend.getSharedRocksKeyBuilder();
 	}
 
 	// ------------------------------------------------------------------------
@@ -108,17 +108,15 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 	@Override
 	public void clear() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			backend.db.delete(columnFamily, writeOptions, key);
-		} catch (IOException | RocksDBException e) {
+			backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace());
+		} catch (RocksDBException e) {
 			throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
 		}
 	}
 
 	@Override
 	public void setCurrentNamespace(N namespace) {
-		this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
+		this.currentNamespace = namespace;
 	}
 
 	@Override
@@ -128,30 +126,78 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 			final TypeSerializer<N> safeNamespaceSerializer,
 			final TypeSerializer<V> safeValueSerializer) throws Exception {
 
-		Preconditions.checkNotNull(serializedKeyAndNamespace);
-		Preconditions.checkNotNull(safeKeySerializer);
-		Preconditions.checkNotNull(safeNamespaceSerializer);
-		Preconditions.checkNotNull(safeValueSerializer);
-
 		//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
 		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
 				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		// we cannot reuse the keySerializationStream member since this method
-		// is called concurrently to the other ones and it may thus contain garbage
-		DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128);
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+						new RocksDBSerializedCompositeKeyBuilder<>(
+							safeKeySerializer,
+							backend.getKeyGroupPrefixBytes(),
+							32
+						);
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+		byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+		return backend.db.get(columnFamily, key);
+	}
+
+	<UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+		UK userKey,
+		TypeSerializer<UK> userKeySerializer) throws IOException {
+		return sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+			currentNamespace,
+			namespaceSerializer,
+			userKey,
+			userKeySerializer
+		);
+	}
+
+	private <T> byte[] serializeValueInternal(T value, TypeSerializer<T> serializer) throws IOException {
+		serializer.serialize(value, dataOutputView);
+		return dataOutputView.getCopyOfBuffer();
+	}
+
+	byte[] serializeCurrentKeyWithGroupAndNamespace() {
+		return sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, namespaceSerializer);
+	}
+
+	byte[] serializeValue(V value) throws IOException {
+		return serializeValue(value, valueSerializer);
+	}
+
+	<T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> serializer) throws IOException {
+		dataOutputView.clear();
+		dataOutputView.writeBoolean(value == null);
+		return serializeValueInternal(value, serializer);
+	}
+
+	<T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
+		dataOutputView.clear();
+		return serializeValueInternal(value, serializer);
+	}
+
+	<T> byte[] serializeValueList(
+		List<T> valueList,
+		TypeSerializer<T> elementSerializer,
+		byte delimiter) throws IOException {
+
+		dataOutputView.clear();
+		boolean first = true;
 
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				keyAndNamespace.f0,
-				safeKeySerializer,
-				keyAndNamespace.f1,
-				safeNamespaceSerializer,
-				tmpKeySerializationView);
+		for (T value : valueList) {
+			Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+			if (first) {
+				first = false;
+			} else {
+				dataOutputView.write(delimiter);
+			}
+			elementSerializer.serialize(value, dataOutputView);
+		}
 
-		return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
+		return dataOutputView.getCopyOfBuffer();
 	}
 
 	public void migrateSerializedValue(
@@ -169,12 +215,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 	}
 
 	byte[] getKeyBytes() {
-		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			return dataOutputView.getCopyOfBuffer();
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Error while serializing key", e);
-		}
+		return serializeCurrentKeyWithGroupAndNamespace();
 	}
 
 	byte[] getValueBytes(V value) {
@@ -187,45 +228,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 		}
 	}
 
-	protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
-		writeKeyWithGroupAndNamespace(
-			backend.getCurrentKeyGroupIndex(),
-			backend.getCurrentKey(),
-			currentNamespace,
-			dataOutputView);
-	}
-
-	protected void writeKeyWithGroupAndNamespace(
-			int keyGroup, K key, N namespace,
-			DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				key,
-				backend.getKeySerializer(),
-				namespace,
-				namespaceSerializer,
-				keySerializationDataOutputView);
-	}
-
-	protected void writeKeyWithGroupAndNamespace(
-			final int keyGroup,
-			final K key,
-			final TypeSerializer<K> keySerializer,
-			final N namespace,
-			final TypeSerializer<N> namespaceSerializer,
-			final DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
-		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
-		Preconditions.checkNotNull(keySerializer);
-		Preconditions.checkNotNull(namespaceSerializer);
-
-		keySerializationDataOutputView.clear();
-		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
-		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
-	}
-
 	protected V getDefaultValue() {
 		if (defaultValue != null) {
 			return valueSerializer.copy(defaultValue);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 770c558..fbd2979 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -109,19 +109,14 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			ACC current = null;
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
-					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -141,10 +136,9 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
+				setCurrentNamespace(target);
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
-				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+				final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bf..02b4ffc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 /**
@@ -80,8 +82,12 @@ public class RocksDBKeySerializationUtils {
 		}
 	}
 
+	public static boolean isSerializerTypeVariableSized(@Nonnull TypeSerializer<?> serializer) {
+		return serializer.getLength() < 0;
+	}
+
 	public static boolean isAmbiguousKeyPossible(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) {
-		return (keySerializer.getLength() < 0) && (namespaceSerializer.getLength() < 0);
+		return (isSerializerTypeVariableSized(keySerializer) && isSerializerTypeVariableSized(namespaceSerializer));
 	}
 
 	public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public class RocksDBKeySerializationUtils {
 		}
 	}
 
-	private static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
+	public static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
 		do {
 			inputView.readByte();
 			value >>>= 8;
@@ -122,7 +128,7 @@ public class RocksDBKeySerializationUtils {
 		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
-	private static void writeVariableIntBytes(
+	public static void writeVariableIntBytes(
 		int value,
 		DataOutputView keySerializationDateDataOutputView)
 		throws IOException {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72445ca..6b00393 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,6 +242,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The native metrics monitor. */
 	private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
+	/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
+	private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
 		ClassLoader userCodeClassLoader,
@@ -294,6 +297,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
+		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -373,6 +377,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
+	@Override
+	public void setCurrentKey(K newKey) {
+		super.setCurrentKey(newKey);
+		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
+	}
+
 	/**
 	 * Should only be called by one thread, and only after all accesses to the DB happened.
 	 */
@@ -456,6 +466,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
+	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+		return sharedRocksKeyBuilder;
+	}
+
 	/**
 	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
 	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index cedeed8..13f5559 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -115,11 +115,10 @@ class RocksDBListState<K, N, V>
 	@Override
 	public List<V> getInternal() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
+			byte[] key = serializeCurrentKeyWithGroupAndNamespace();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			return deserializeList(valueBytes);
-		} catch (IOException | RocksDBException e) {
+		} catch (RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
@@ -160,11 +159,12 @@ class RocksDBListState<K, N, V>
 		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
 
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			dataOutputView.clear();
-			elementSerializer.serialize(value, dataOutputView);
-			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+			backend.db.merge(
+				columnFamily,
+				writeOptions,
+				serializeCurrentKeyWithGroupAndNamespace(),
+				serializeValue(value, elementSerializer)
+			);
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -176,21 +176,17 @@ class RocksDBListState<K, N, V>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			// create the target full-binary-key
-			writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-			final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+			setCurrentNamespace(target);
+			final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 
-					byte[] sourceKey = dataOutputView.getCopyOfBuffer();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -218,10 +214,11 @@ class RocksDBListState<K, N, V>
 
 		if (!values.isEmpty()) {
 			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.getCopyOfBuffer();
-				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
-				backend.db.put(columnFamily, writeOptions, key, premerge);
+				backend.db.put(
+					columnFamily,
+					writeOptions,
+					serializeCurrentKeyWithGroupAndNamespace(),
+					serializeValueList(values, elementSerializer, DELIMITER));
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -234,10 +231,11 @@ class RocksDBListState<K, N, V>
 
 		if (!values.isEmpty()) {
 			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.getCopyOfBuffer();
-				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
-				backend.db.merge(columnFamily, writeOptions, key, premerge);
+				backend.db.merge(
+					columnFamily,
+					writeOptions,
+					serializeCurrentKeyWithGroupAndNamespace(),
+					serializeValueList(values, elementSerializer, DELIMITER));
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -273,26 +271,6 @@ class RocksDBListState<K, N, V>
 		}
 	}
 
-	private static <V> byte[] getPreMergedValue(
-		List<V> values,
-		TypeSerializer<V> elementSerializer,
-		DataOutputSerializer keySerializationStream) throws IOException {
-
-		keySerializationStream.clear();
-		boolean first = true;
-		for (V value : values) {
-			Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
-			if (first) {
-				first = false;
-			} else {
-				keySerializationStream.write(DELIMITER);
-			}
-			elementSerializer.serialize(value, keySerializationStream);
-		}
-
-		return keySerializationStream.getCopyOfBuffer();
-	}
-
 	@SuppressWarnings("unchecked")
 	static <E, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ class RocksDBListState<K, N, V>
 				prevPosition = in.getPosition();
 			}
 			try {
-				return result.isEmpty() ? null : getPreMergedValue(result, elementSerializer, out);
+				return result.isEmpty() ? null : serializeValueList(result, elementSerializer, DELIMITER);
 			} catch (IOException e) {
 				throw new FlinkRuntimeException("Failed to serialize transformed list", e);
 			}
 		}
+
+		byte[] serializeValueList(
+			List<T> valueList,
+			TypeSerializer<T> elementSerializer,
+			byte delimiter) throws IOException {
+
+			out.clear();
+			boolean first = true;
+
+			for (T value : valueList) {
+				Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+				if (first) {
+					first = false;
+				} else {
+					out.write(delimiter);
+				}
+				elementSerializer.serialize(value, out);
+			}
+
+			return out.getCopyOfBuffer();
+		}
 	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 7abbe3d..13cbded 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -119,7 +119,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	public UV get(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 		byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
 
 		return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
 
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-		byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+		byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
 
 		backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 	}
@@ -142,8 +142,8 @@ class RocksDBMapState<K, N, UK, UV>
 
 		try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
 			for (Map.Entry<UK, UV> entry : map.entrySet()) {
-				byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
-				byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+				byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(), userKeySerializer);
+				byte[] rawValueBytes = serializeValueNullSensitive(entry.getValue(), userValueSerializer);
 				writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
 			}
 		}
@@ -151,21 +151,21 @@ class RocksDBMapState<K, N, UK, UV>
 
 	@Override
 	public void remove(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 
 		backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
 	}
 
 	@Override
 	public boolean contains(UK userKey) throws IOException, RocksDBException {
-		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 		byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
 
 		return (rawValueBytes != null);
 	}
 
 	@Override
-	public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+	public Iterable<Map.Entry<UK, UV>> entries() {
 		final Iterator<Map.Entry<UK, UV>> iterator = iterator();
 
 		// Return null to make the behavior consistent with other states.
@@ -177,10 +177,11 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterable<UK> keys() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterable<UK> keys() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+			@Nullable
 			@Override
 			public UK next() {
 				RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterable<UV> values() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterable<UV> values() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
@@ -203,8 +204,8 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
-		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+	public Iterator<Map.Entry<UK, UV>> iterator() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
 		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
@@ -220,7 +221,7 @@ class RocksDBMapState<K, N, UK, UV>
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
 				WriteBatch writeBatch = new WriteBatch(128)) {
 
-				final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
+				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 				iterator.seek(keyPrefixBytes);
 
 				while (iterator.isValid()) {
@@ -259,18 +260,15 @@ class RocksDBMapState<K, N, UK, UV>
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		DataOutputSerializer outputView = new DataOutputSerializer(128);
-		DataInputDeserializer inputView = new DataInputDeserializer();
-
-		writeKeyWithGroupAndNamespace(
-				keyGroup,
-				keyAndNamespace.f0,
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			new RocksDBSerializedCompositeKeyBuilder<>(
 				safeKeySerializer,
-				keyAndNamespace.f1,
-				safeNamespaceSerializer,
-				outputView);
+				backend.getKeyGroupPrefixBytes(),
+				32);
+
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
 
-		final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+		final byte[] keyPrefixBytes = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 
 		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
@@ -282,7 +280,8 @@ class RocksDBMapState<K, N, UK, UV>
 				keyPrefixBytes,
 				dupUserKeySerializer,
 				dupUserValueSerializer,
-				inputView) {
+				dataInputView
+			) {
 
 			@Override
 			public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ class RocksDBMapState<K, N, UK, UV>
 	//  Serialization Methods
 	// ------------------------------------------------------------------------
 
-	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
-		writeCurrentKeyWithGroupAndNamespace();
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
-	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
-		serializeCurrentKeyAndNamespace();
-		userKeySerializer.serialize(userKey, dataOutputView);
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
-	private static <UV> byte[] serializeUserValue(
-		UV userValue,
-		TypeSerializer<UV> valueSerializer,
-		DataOutputSerializer dataOutputView) throws IOException {
-
-		dataOutputView.clear();
-
-		if (userValue == null) {
-			dataOutputView.writeBoolean(true);
-		} else {
-			dataOutputView.writeBoolean(false);
-			valueSerializer.serialize(userValue, dataOutputView);
-		}
-
-		return dataOutputView.getCopyOfBuffer();
-	}
-
 	private static <UK> UK deserializeUserKey(
 		DataInputDeserializer dataInputView,
 		int userKeyOffset,
@@ -474,7 +443,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			try {
 				userValue = value;
-				rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView);
+				rawValueBytes = serializeValueNullSensitive(value, valueSerializer);
 
 				db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 			} catch (IOException | RocksDBException e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 69736ae..a5eb46e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -102,20 +102,14 @@ class RocksDBReducingState<K, N, V>
 			return;
 		}
 
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
 		try {
 			V current = null;
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-
-					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
-					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+					setCurrentNamespace(source);
+					final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -136,9 +130,8 @@ class RocksDBReducingState<K, N, V>
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
-				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+				setCurrentNamespace(target);
+				final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 0000000..8e83e29
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder<K> {
+
+	/** The serializer for the key. */
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	/** The output to write the key into. */
+	@Nonnull
+	private final DataOutputSerializer keyOutView;
+
+	/** The number of Key-group-prefix bytes for the key. */
+	@Nonnegative
+	private final int keyGroupPrefixBytes;
+
+	/** This flag indicates whether the key type has a variable byte size in serialization. */
+	private final boolean keySerializerTypeVariableSized;
+
+	/** Mark for the position after the serialized key. */
+	@Nonnegative
+	private int afterKeyMark;
+
+	public RocksDBSerializedCompositeKeyBuilder(
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnegative int initialSize) {
+		this(
+			keySerializer,
+			new DataOutputSerializer(initialSize),
+			keyGroupPrefixBytes,
+			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+			0);
+	}
+
+	@VisibleForTesting
+	RocksDBSerializedCompositeKeyBuilder(
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull DataOutputSerializer keyOutView,
+		@Nonnegative int keyGroupPrefixBytes,
+		boolean keySerializerTypeVariableSized,
+		@Nonnegative int afterKeyMark) {
+		this.keySerializer = keySerializer;
+		this.keyOutView = keyOutView;
+		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+		this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
+		this.afterKeyMark = afterKeyMark;
+	}
+
+	/**
+	 * Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create
+	 * composite keys with provided namespaces.
+	 *
+	 * @param key        the key.
+	 * @param keyGroupId the key-group id for the key.
+	 */
+	public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
+		try {
+			serializeKeyGroupAndKey(key, keyGroupId);
+		} catch (IOException shouldNeverHappen) {
+			throw new FlinkRuntimeException(shouldNeverHappen);
+		}
+	}
+
+	/**
+	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
+	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+	 *
+	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
+	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+	 * @param <N>                 the type of the namespace.
+	 * @return the bytes for the serialized composite key of key-group, key, namespace.
+	 */
+	@Nonnull
+	public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
+		try {
+			serializeNamespace(namespace, namespaceSerializer);
+			final byte[] result = keyOutView.getCopyOfBuffer();
+			resetToKey();
+			return result;
+		} catch (IOException shouldNeverHappen) {
+			throw new FlinkRuntimeException(shouldNeverHappen);
+		}
+	}
+
+	/**
+	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
+	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given user-key.
+	 *
+	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
+	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+	 * @param userKey             the user-key to concatenate for the serialized composite key, after the namespace.
+	 * @param userKeySerializer   the serializer to obtain the serialized form of the user-key.
+	 * @param <N>                 the type of the namespace.
+	 * @param <UK>                the type of the user-key.
+	 * @return the bytes for the serialized composite key of key-group, key, namespace.
+	 */
+	@Nonnull
+	public <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+		@Nonnull N namespace,
+		@Nonnull TypeSerializer<N> namespaceSerializer,
+		@Nonnull UK userKey,
+		@Nonnull TypeSerializer<UK> userKeySerializer) throws IOException {
+		serializeNamespace(namespace, namespaceSerializer);
+		userKeySerializer.serialize(userKey, keyOutView);
+		byte[] result = keyOutView.getCopyOfBuffer();
+		resetToKey();
+		return result;
+	}
+
+	private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
+
+		// clear buffer and mark
+		resetFully();
+
+		// write key-group
+		RocksDBKeySerializationUtils.writeKeyGroup(
+			keyGroupId,
+			keyGroupPrefixBytes,
+			keyOutView);
+		// write key
+		keySerializer.serialize(key, keyOutView);
+		afterKeyMark = keyOutView.length();
+	}
+
+	private <N> void serializeNamespace(
+		@Nonnull N namespace,
+		@Nonnull TypeSerializer<N> namespaceSerializer) throws IOException {
+
+		// this should only be called when there is already a key written so that we build the composite.
+		assert isKeyWritten();
+
+		final boolean ambiguousCompositeKeyPossible = isAmbiguousCompositeKeyPossible(namespaceSerializer);
+		if (ambiguousCompositeKeyPossible) {
+			RocksDBKeySerializationUtils.writeVariableIntBytes(
+				afterKeyMark - keyGroupPrefixBytes,
+				keyOutView);
+		}
+		RocksDBKeySerializationUtils.writeNameSpace(
+			namespace,
+			namespaceSerializer,
+			keyOutView,
+			ambiguousCompositeKeyPossible);
+	}
+
+	private void resetFully() {
+		afterKeyMark = 0;
+		keyOutView.clear();
+	}
+
+	private void resetToKey() {
+		keyOutView.setPosition(afterKeyMark);
+	}
+
+	private boolean isKeyWritten() {
+		return afterKeyMark > 0;
+	}
+
+	@VisibleForTesting
+	boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
+		return keySerializerTypeVariableSized &
+			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+	}
+
+	@VisibleForTesting
+	boolean isKeySerializerTypeVariableSized() {
+		return keySerializerTypeVariableSized;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 97b83df..a8f5163 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -80,9 +80,9 @@ class RocksDBValueState<K, N, V>
 	@Override
 	public V value() {
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			byte[] valueBytes = backend.db.get(columnFamily, key);
+			byte[] valueBytes = backend.db.get(columnFamily,
+				serializeCurrentKeyWithGroupAndNamespace());
+
 			if (valueBytes == null) {
 				return getDefaultValue();
 			}
@@ -101,11 +101,9 @@ class RocksDBValueState<K, N, V>
 		}
 
 		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.getCopyOfBuffer();
-			dataOutputView.clear();
-			valueSerializer.serialize(value, dataOutputView);
-			backend.db.put(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+			backend.db.put(columnFamily, writeOptions,
+				serializeCurrentKeyWithGroupAndNamespace(),
+				serializeValue(value));
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
new file mode 100644
index 0000000..70a4c3c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+	private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+
+	private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+	private static final Collection<Integer> TEST_INTS = Arrays.asList(42, 4711);
+	private static final Collection<String> TEST_STRINGS = Arrays.asList("test123", "abc");
+
+	@Before
+	public void before() {
+		dataOutputSerializer.clear();
+	}
+
+	@Test
+	public void testSetKey() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, parallelism);
+			testSetKeyInternal(StringSerializer.INSTANCE, TEST_STRINGS, parallelism);
+		}
+	}
+
+	@Test
+	public void testSetKeyNamespace() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyNamespaceInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+		}
+	}
+
+	@Test
+	public void testSetKeyNamespaceUserKey() throws IOException {
+		for (int parallelism : TEST_PARALLELISMS) {
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_INTS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_STRINGS, parallelism);
+			testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_STRINGS, parallelism);
+		}
+	}
+
+	private <K> void testSetKeyInternal(TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			byte[] result = dataOutputSerializer.getCopyOfBuffer();
+			deserializer.setBuffer(result);
+			assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, serializer, deserializer, false);
+			Assert.assertEquals(0, deserializer.available());
+		}
+	}
+
+	private <K, N> void testSetKeyNamespaceInternal(
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		Collection<K> testKeys,
+		Collection<N> testNamespaces,
+		int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			for (N testNamespace : testNamespaces) {
+				byte[] compositeBytes = keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+				deserializer.setBuffer(compositeBytes);
+				assertKeyGroupKeyNamespaceBytes(
+					testKey,
+					keyGroup,
+					prefixBytes,
+					keySerializer,
+					testNamespace,
+					namespaceSerializer,
+					deserializer,
+					ambiguousPossible);
+				Assert.assertEquals(0, deserializer.available());
+			}
+		}
+	}
+
+	private <K, N, U> void testSetKeyNamespaceUserKeyInternal(
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<U> userKeySerializer,
+		Collection<K> testKeys,
+		Collection<N> testNamespaces,
+		Collection<U> testUserKeys,
+		int maxParallelism) throws IOException {
+		final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+			createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+		final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+		for (K testKey : testKeys) {
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			for (N testNamespace : testNamespaces) {
+				for (U testUserKey : testUserKeys) {
+					byte[] compositeBytes = keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+						testNamespace,
+						namespaceSerializer,
+						testUserKey,
+						userKeySerializer);
+
+					deserializer.setBuffer(compositeBytes);
+					assertKeyGroupKeyNamespaceUserKeyBytes(
+						testKey,
+						keyGroup,
+						prefixBytes,
+						keySerializer,
+						testNamespace,
+						namespaceSerializer,
+						testUserKey,
+						userKeySerializer,
+						deserializer,
+						ambiguousPossible);
+
+					Assert.assertEquals(0, deserializer.available());
+				}
+			}
+		}
+	}
+
+	private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
+		TypeSerializer<K> serializer,
+		int prefixBytes) {
+		final boolean variableSize = RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+		return new RocksDBSerializedCompositeKeyBuilder<>(
+			serializer,
+			dataOutputSerializer,
+			prefixBytes,
+			variableSize,
+			0);
+	}
+
+	private <K> int setKeyAndReturnKeyGroup(
+		RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
+		K key,
+		int maxParallelism) {
+
+		int keyGroup = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, maxParallelism);
+		compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+		return keyGroup;
+	}
+
+	private <K> void assertKeyKeyGroupBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> typeSerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+
+		Assert.assertEquals(keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+		Assert.assertEquals(key, RocksDBKeySerializationUtils.readKey(typeSerializer, deserializer, ambiguousCompositeKeyPossible));
+	}
+
+	private <K, N> void assertKeyGroupKeyNamespaceBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> keySerializer,
+		N namespace,
+		TypeSerializer<N> namespaceSerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+		assertKeyKeyGroupBytes(key, keyGroup, prefixBytes, keySerializer, deserializer, ambiguousCompositeKeyPossible);
+		N readNamespace =
+			RocksDBKeySerializationUtils.readNamespace(namespaceSerializer, deserializer, ambiguousCompositeKeyPossible);
+		Assert.assertEquals(namespace, readNamespace);
+	}
+
+	private <K, N, U> void assertKeyGroupKeyNamespaceUserKeyBytes(
+		K key,
+		int keyGroup,
+		int prefixBytes,
+		TypeSerializer<K> keySerializer,
+		N namespace,
+		TypeSerializer<N> namespaceSerializer,
+		U userKey,
+		TypeSerializer<U> userKeySerializer,
+		DataInputDeserializer deserializer,
+		boolean ambiguousCompositeKeyPossible) throws IOException {
+		assertKeyGroupKeyNamespaceBytes(
+			key,
+			keyGroup,
+			prefixBytes,
+			keySerializer,
+			namespace,
+			namespaceSerializer,
+			deserializer,
+			ambiguousCompositeKeyPossible);
+		Assert.assertEquals(userKey, userKeySerializer.deserialize(deserializer));
+	}
+}


[flink] 01/03: [hotfix] Improve performance of GenericArraySerializer.copy()

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28e0b83dd86cbf2a8d0eeda95391f206ba0d812b
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Wed Jun 6 17:15:20 2018 +0200

    [hotfix] Improve performance of GenericArraySerializer.copy()
---
 .../typeutils/base/GenericArraySerializer.java       | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 6668c57..55ba8ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,16 +87,21 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 	@Override
 	public C[] copy(C[] from) {
-		C[] copy = create(from.length);
 
-		for (int i = 0; i < copy.length; i++) {
-			C val = from[i];
-			if (val != null) {
-				copy[i] = this.componentSerializer.copy(val);
+		final TypeSerializer<C> serializer = this.componentSerializer;
+
+		if (serializer.isImmutableType()) {
+			return Arrays.copyOf(from, from.length);
+		} else {
+			C[] copy = create(from.length);
+			for (int i = 0; i < copy.length; i++) {
+				C val = from[i];
+				if (val != null) {
+					copy[i] = serializer.copy(val);
+				}
 			}
+			return copy;
 		}
-
-		return copy;
 	}
 	
 	@Override


[flink] 02/03: [hotfix] Remove unused generic parameter from RocksDB states

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f9a8840fc3cf58e3b43ae4ce1da5a9023888442
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Dec 11 15:21:25 2018 +0100

    [hotfix] Remove unused generic parameter from RocksDB states
---
 .../flink/contrib/streaming/state/AbstractRocksDBAppendingState.java | 5 ++---
 .../apache/flink/contrib/streaming/state/AbstractRocksDBState.java   | 3 +--
 .../flink/contrib/streaming/state/RocksDBAggregatingState.java       | 2 +-
 .../apache/flink/contrib/streaming/state/RocksDBFoldingState.java    | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java      | 2 +-
 .../org/apache/flink/contrib/streaming/state/RocksDBListState.java   | 2 +-
 .../org/apache/flink/contrib/streaming/state/RocksDBMapState.java    | 2 +-
 .../apache/flink/contrib/streaming/state/RocksDBReducingState.java   | 2 +-
 .../org/apache/flink/contrib/streaming/state/RocksDBValueState.java  | 2 +-
 9 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 8c0f4d7..04e665a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -28,8 +27,8 @@ import org.rocksdb.RocksDBException;
 
 import java.io.IOException;
 
-abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends State>
-	extends AbstractRocksDBState<K, N, SV, S>
+abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT>
+	extends AbstractRocksDBState<K, N, SV>
 	implements InternalAppendingState<K, N, IN, SV, OUT> {
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 2218bc0..b5ab996 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -44,9 +44,8 @@ import java.io.IOException;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of values kept internally in state.
- * @param <S> The type of {@link State}.
  */
-public abstract class AbstractRocksDBState<K, N, V, S extends State> implements InternalKvState<K, N, V>, State {
+public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K, N, V>, State {
 
 	/** Serializer for the namespace. */
 	final TypeSerializer<N> namespaceSerializer;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 2085fb8..770c558 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -43,7 +43,7 @@ import java.util.Collection;
  * @param <R> The type of the value returned from the state
  */
 class RocksDBAggregatingState<K, N, T, ACC, R>
-	extends AbstractRocksDBAppendingState<K, N, T, ACC, R, AggregatingState<T, R>>
+	extends AbstractRocksDBAppendingState<K, N, T, ACC, R>
 	implements InternalAggregatingState<K, N, T, ACC, R> {
 
 	/** User-specified aggregation function. */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 4d66357..c5e830f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -42,7 +42,7 @@ import org.rocksdb.ColumnFamilyHandle;
  */
 @Deprecated
 class RocksDBFoldingState<K, N, T, ACC>
-	extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, FoldingState<T, ACC>>
+	extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC>
 	implements InternalFoldingState<K, N, T, ACC> {
 
 	/** User-specified fold function. */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 700c546..72445ca 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1400,7 +1400,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		@SuppressWarnings("unchecked")
-		AbstractRocksDBState<?, ?, SV, S> rocksDBState = (AbstractRocksDBState<?, ?, SV, S>) state;
+		AbstractRocksDBState<?, ?, SV> rocksDBState = (AbstractRocksDBState<?, ?, SV>) state;
 
 		Snapshot rocksDBSnapshot = db.getSnapshot();
 		try (
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 6904c85..cedeed8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -59,7 +59,7 @@ import static org.apache.flink.runtime.state.StateSnapshotTransformer.Collection
  * @param <V> The type of the values in the list state.
  */
 class RocksDBListState<K, N, V>
-	extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
+	extends AbstractRocksDBState<K, N, List<V>>
 	implements InternalListState<K, N, V> {
 
 	/** Serializer for the values. */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index cb656b5..7abbe3d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -64,7 +64,7 @@ import java.util.Map;
  * @param <UV> The type of the values in the map state.
  */
 class RocksDBMapState<K, N, UK, UV>
-	extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
+	extends AbstractRocksDBState<K, N, Map<UK, UV>>
 	implements InternalMapState<K, N, UK, UV> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 138357b..69736ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -41,7 +41,7 @@ import java.util.Collection;
  * @param <V> The type of value that the state state stores.
  */
 class RocksDBReducingState<K, N, V>
-	extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
+	extends AbstractRocksDBAppendingState<K, N, V, V, V>
 	implements InternalReducingState<K, N, V> {
 
 	/** User-specified reduce function. */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 0ca90d4..97b83df 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -40,7 +40,7 @@ import java.io.IOException;
  * @param <V> The type of value that the state state stores.
  */
 class RocksDBValueState<K, N, V>
-	extends AbstractRocksDBState<K, N, V, ValueState<V>>
+	extends AbstractRocksDBState<K, N, V>
 	implements InternalValueState<K, N, V> {
 
 	/**