You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/10 18:10:25 UTC

[flink] 02/02: [FLINK-10124][state] Use ByteArrayDataInput/OutputView instead of stream + wrapper

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18ff4ab8e55d76522f835cc683f57252b3f742bc
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Fri Aug 10 20:06:58 2018 +0200

    [FLINK-10124][state] Use ByteArrayDataInput/OutputView instead of stream + wrapper
---
 .../flink/core/memory/ByteArrayDataInputView.java  |  4 ++
 .../state/AbstractRocksDBAppendingState.java       |  5 +-
 .../streaming/state/AbstractRocksDBState.java      | 50 +++++++---------
 .../streaming/state/RocksDBAggregatingState.java   | 28 ++++-----
 .../state/RocksDBKeySerializationUtils.java        | 39 +++++--------
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 +--
 .../contrib/streaming/state/RocksDBListState.java  | 68 ++++++++--------------
 .../contrib/streaming/state/RocksDBMapState.java   | 41 +++++--------
 .../streaming/state/RocksDBReducingState.java      | 33 +++++------
 .../contrib/streaming/state/RocksDBValueState.java | 18 +++---
 .../RocksDBIncrementalCheckpointUtilsTest.java     | 33 ++++-------
 .../state/RocksDBKeySerializationUtilsTest.java    | 38 ++++++------
 12 files changed, 150 insertions(+), 215 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0..698a9f9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public class ByteArrayDataInputView extends DataInputViewStreamWrapper {
 	public void setData(@Nonnull byte[] buffer, int offset, int length) {
 		inStreamWithPos.setBuffer(buffer, offset, length);
 	}
+
+	public void setData(@Nonnull byte[] buffer) {
+		setData(buffer, 0, buffer.length);
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f8..2a9ab75 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends State
 			if (valueBytes == null) {
 				return null;
 			}
-			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+			dataInputView.setData(valueBytes);
+			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089..65b7f1f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 
 	protected final WriteOptions writeOptions;
 
-	protected final ByteArrayOutputStreamWithPos keySerializationStream;
+	protected final ByteArrayDataOutputView dataOutputView;
 
-	protected final DataOutputView keySerializationDataOutputView;
+	protected final ByteArrayDataInputView dataInputView;
 
 	private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
 		this.defaultValue = defaultValue;
 
-		this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
-		this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
-		this.ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+		this.dataOutputView = new ByteArrayDataOutputView(128);
+		this.dataInputView = new ByteArrayDataInputView();
+		this.ambiguousKeyPossible =
+			RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -109,7 +109,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 	public void clear() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			backend.db.delete(columnFamily, writeOptions, key);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
@@ -141,8 +141,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 
 		// we cannot reuse the keySerializationStream member since this method
 		// is called concurrently to the other ones and it may thus contain garbage
-		ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
-		DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
+		ByteArrayDataOutputView tmpKeySerializationView = new ByteArrayDataOutputView(128);
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -150,16 +149,15 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 				safeKeySerializer,
 				keyAndNamespace.f1,
 				safeNamespaceSerializer,
-				tmpKeySerializationStream,
-				tmpKeySerializationDateDataOutputView);
+				tmpKeySerializationView);
 
-		return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
+		return backend.db.get(columnFamily, tmpKeySerializationView.toByteArray());
 	}
 
 	byte[] getKeyBytes() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			return keySerializationStream.toByteArray();
+			return dataOutputView.toByteArray();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing key", e);
 		}
@@ -167,9 +165,9 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 
 	byte[] getValueBytes(V value) {
 		try {
-			keySerializationStream.reset();
-			valueSerializer.serialize(value, new DataOutputViewStreamWrapper(keySerializationStream));
-			return keySerializationStream.toByteArray();
+			dataOutputView.reset();
+			valueSerializer.serialize(value, dataOutputView);
+			return dataOutputView.toByteArray();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing value", e);
 		}
@@ -180,14 +178,12 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 			backend.getCurrentKeyGroupIndex(),
 			backend.getCurrentKey(),
 			currentNamespace,
-			keySerializationStream,
-			keySerializationDataOutputView);
+			dataOutputView);
 	}
 
 	protected void writeKeyWithGroupAndNamespace(
 			int keyGroup, K key, N namespace,
-			ByteArrayOutputStreamWithPos keySerializationStream,
-			DataOutputView keySerializationDataOutputView) throws IOException {
+			ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -195,7 +191,6 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 				backend.getKeySerializer(),
 				namespace,
 				namespaceSerializer,
-				keySerializationStream,
 				keySerializationDataOutputView);
 	}
 
@@ -205,17 +200,16 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
 			final TypeSerializer<K> keySerializer,
 			final N namespace,
 			final TypeSerializer<N> namespaceSerializer,
-			final ByteArrayOutputStreamWithPos keySerializationStream,
-			final DataOutputView keySerializationDataOutputView) throws IOException {
+			final ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
 
 		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
 		Preconditions.checkNotNull(keySerializer);
 		Preconditions.checkNotNull(namespaceSerializer);
 
-		keySerializationStream.reset();
+		keySerializationDataOutputView.reset();
 		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
-		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
+		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
 	}
 
 	protected V getDefaultValue() {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 209d18f..4f9ef2f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -121,17 +119,15 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] sourceKey = dataOutputView.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						ACC value = valueSerializer.deserialize(
-								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+						dataInputView.setData(valueBytes);
+						ACC value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
 							current = aggFunction.merge(current, value);
@@ -146,27 +142,25 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(
-						keyGroup, key, target,
-						keySerializationStream, keySerializationDataOutputView);
+				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetKey = dataOutputView.toByteArray();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
 					// target also had a value, merge
-					ACC value = valueSerializer.deserialize(
-							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+					dataInputView.setData(targetValueBytes);
+					ACC value = valueSerializer.deserialize(dataInputView);
 
 					current = aggFunction.merge(current, value);
 				}
 
 				// serialize the resulting value
-				keySerializationStream.reset();
-				valueSerializer.serialize(current, keySerializationDataOutputView);
+				dataOutputView.reset();
+				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 1bc49e9..7c9e3f8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -18,8 +18,8 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -41,13 +41,12 @@ public class RocksDBKeySerializationUtils {
 
 	public static <K> K readKey(
 		TypeSerializer<K> keySerializer,
-		ByteArrayInputStreamWithPos inputStream,
-		DataInputView inputView,
+		ByteArrayDataInputView inputView,
 		boolean ambiguousKeyPossible) throws IOException {
-		int beforeRead = inputStream.getPosition();
+		int beforeRead = inputView.getPosition();
 		K key = keySerializer.deserialize(inputView);
 		if (ambiguousKeyPossible) {
-			int length = inputStream.getPosition() - beforeRead;
+			int length = inputView.getPosition() - beforeRead;
 			readVariableIntBytes(inputView, length);
 		}
 		return key;
@@ -55,13 +54,12 @@ public class RocksDBKeySerializationUtils {
 
 	public static <N> N readNamespace(
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayInputStreamWithPos inputStream,
-		DataInputView inputView,
+		ByteArrayDataInputView inputView,
 		boolean ambiguousKeyPossible) throws IOException {
-		int beforeRead = inputStream.getPosition();
+		int beforeRead = inputView.getPosition();
 		N namespace = namespaceSerializer.deserialize(inputView);
 		if (ambiguousKeyPossible) {
-			int length = inputStream.getPosition() - beforeRead;
+			int length = inputView.getPosition() - beforeRead;
 			readVariableIntBytes(inputView, length);
 		}
 		return namespace;
@@ -70,17 +68,15 @@ public class RocksDBKeySerializationUtils {
 	public static <N> void writeNameSpace(
 		N namespace,
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDataOutputView,
+		ByteArrayDataOutputView keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 
-		int beforeWrite = keySerializationStream.getPosition();
+		int beforeWrite = keySerializationDataOutputView.getPosition();
 		namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write length of namespace
-			writeLengthFrom(beforeWrite, keySerializationStream,
-				keySerializationDataOutputView);
+			writeLengthFrom(beforeWrite, keySerializationDataOutputView);
 		}
 	}
 
@@ -100,17 +96,15 @@ public class RocksDBKeySerializationUtils {
 	public static <K> void writeKey(
 		K key,
 		TypeSerializer<K> keySerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDataOutputView,
+		ByteArrayDataOutputView keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 		//write key
-		int beforeWrite = keySerializationStream.getPosition();
+		int beforeWrite = keySerializationDataOutputView.getPosition();
 		keySerializer.serialize(key, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write size of key
-			writeLengthFrom(beforeWrite, keySerializationStream,
-				keySerializationDataOutputView);
+			writeLengthFrom(beforeWrite, keySerializationDataOutputView);
 		}
 	}
 
@@ -123,9 +117,8 @@ public class RocksDBKeySerializationUtils {
 
 	private static void writeLengthFrom(
 		int fromPosition,
-		ByteArrayOutputStreamWithPos keySerializationStream,
-		DataOutputView keySerializationDateDataOutputView) throws IOException {
-		int length = keySerializationStream.getPosition() - fromPosition;
+		ByteArrayDataOutputView keySerializationDateDataOutputView) throws IOException {
+		int length = keySerializationDateDataOutputView.getPosition() - fromPosition;
 		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 55cc4f9..c159976 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -45,7 +45,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -363,17 +362,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
 
 		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
-		final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
+		final ByteArrayDataOutputView namespaceOutputView = new ByteArrayDataOutputView(8);
 		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
 		try {
 			RocksDBKeySerializationUtils.writeNameSpace(
 				namespace,
 				namespaceSerializer,
-				namespaceOutputStream,
-				new DataOutputViewStreamWrapper(namespaceOutputStream),
+				namespaceOutputView,
 				ambiguousKeyPossible);
-			nameSpaceBytes = namespaceOutputStream.toByteArray();
+			nameSpaceBytes = namespaceOutputView.toByteArray();
 		} catch (IOException ex) {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 176f48c..cdd7afb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -25,9 +25,8 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -116,32 +115,31 @@ class RocksDBListState<K, N, V>
 	public List<V> getInternal() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
-			return deserializeList(valueBytes, elementSerializer);
+			return deserializeList(valueBytes);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
 		}
 	}
 
-	private static <V> List<V> deserializeList(
-		byte[] valueBytes, TypeSerializer<V> elementSerializer) {
+	private List<V> deserializeList(
+		byte[] valueBytes) {
 		if (valueBytes == null) {
 			return null;
 		}
 
-		DataInputViewStreamWrapper in = new ByteArrayDataInputView(valueBytes);
+		dataInputView.setData(valueBytes);
 
 		List<V> result = new ArrayList<>();
 		V next;
-		while ((next = deserializeNextElement(in, elementSerializer)) != null) {
+		while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
 			result.add(next);
 		}
 		return result;
 	}
 
-	private static <V> V deserializeNextElement(
-		DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
+	private static <V> V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
 		try {
 			if (in.available() > 0) {
 				V element = elementSerializer.deserialize(in);
@@ -162,11 +160,10 @@ class RocksDBListState<K, N, V>
 
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			keySerializationStream.reset();
-			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
-			elementSerializer.serialize(value, out);
-			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+			byte[] key = dataOutputView.toByteArray();
+			dataOutputView.reset();
+			elementSerializer.serialize(value, dataOutputView);
+			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.toByteArray());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -184,19 +181,15 @@ class RocksDBListState<K, N, V>
 
 		try {
 			// create the target full-binary-key
-			writeKeyWithGroupAndNamespace(
-					keyGroup, key, target,
-					keySerializationStream, keySerializationDataOutputView);
-			final byte[] targetKey = keySerializationStream.toByteArray();
+			writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
+			final byte[] targetKey = dataOutputView.toByteArray();
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					byte[] sourceKey = keySerializationStream.toByteArray();
+					byte[] sourceKey = dataOutputView.toByteArray();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -225,14 +218,9 @@ class RocksDBListState<K, N, V>
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
-				if (premerge != null) {
-					backend.db.put(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in update()");
-				}
+				byte[] key = dataOutputView.toByteArray();
+				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+				backend.db.put(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -246,14 +234,9 @@ class RocksDBListState<K, N, V>
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
-				if (premerge != null) {
-					backend.db.merge(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in addAll()");
-				}
+				byte[] key = dataOutputView.toByteArray();
+				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+				backend.db.merge(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
 				throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
 			}
@@ -263,8 +246,7 @@ class RocksDBListState<K, N, V>
 	private static <V> byte[] getPreMergedValue(
 		List<V> values,
 		TypeSerializer<V> elementSerializer,
-		ByteArrayOutputStreamWithPos keySerializationStream) throws IOException {
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+		ByteArrayDataOutputView keySerializationStream) throws IOException {
 
 		keySerializationStream.reset();
 		boolean first = true;
@@ -275,7 +257,7 @@ class RocksDBListState<K, N, V>
 			} else {
 				keySerializationStream.write(DELIMITER);
 			}
-			elementSerializer.serialize(value, out);
+			elementSerializer.serialize(value, keySerializationStream);
 		}
 
 		return keySerializationStream.toByteArray();
@@ -298,7 +280,7 @@ class RocksDBListState<K, N, V>
 	static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
 		private final StateSnapshotTransformer<T> elementTransformer;
 		private final TypeSerializer<T> elementSerializer;
-		private final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(128);
+		private final ByteArrayDataOutputView out = new ByteArrayDataOutputView(128);
 		private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
 
 		StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b08eade..ad6b7c2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,10 +26,6 @@ import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
@@ -263,8 +259,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
-		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
+		ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128);
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -272,10 +267,9 @@ class RocksDBMapState<K, N, UK, UV>
 				safeKeySerializer,
 				keyAndNamespace.f1,
 				safeNamespaceSerializer,
-				outputStream,
 				outputView);
 
-		final byte[] keyPrefixBytes = outputStream.toByteArray();
+		final byte[] keyPrefixBytes = outputView.toByteArray();
 
 		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
@@ -309,14 +303,14 @@ class RocksDBMapState<K, N, UK, UV>
 	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
 		serializeCurrentKeyAndNamespace();
-		userKeySerializer.serialize(userKey, keySerializationDataOutputView);
+		userKeySerializer.serialize(userKey, dataOutputView);
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private byte[] serializeUserValue(UV userValue) throws IOException {
@@ -328,34 +322,29 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
-		keySerializationStream.reset();
+		dataOutputView.reset();
 
 		if (userValue == null) {
-			keySerializationDataOutputView.writeBoolean(true);
+			dataOutputView.writeBoolean(true);
 		} else {
-			keySerializationDataOutputView.writeBoolean(false);
-			valueSerializer.serialize(userValue, keySerializationDataOutputView);
+			dataOutputView.writeBoolean(false);
+			valueSerializer.serialize(userValue, dataOutputView);
 		}
 
-		return keySerializationStream.toByteArray();
+		return dataOutputView.toByteArray();
 	}
 
 	private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
-		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
-		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
-
-		in.skipBytes(userKeyOffset);
-
-		return keySerializer.deserialize(in);
+		dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
+		return keySerializer.deserialize(dataInputView);
 	}
 
 	private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
-		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
-		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+		dataInputView.setData(rawValueBytes);
 
-		boolean isNull = in.readBoolean();
+		boolean isNull = dataInputView.readBoolean();
 
-		return isNull ? null : valueSerializer.deserialize(in);
+		return isNull ? null : valueSerializer.deserialize(dataInputView);
 	}
 
 	private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 490960e..d1fe3bd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -25,15 +25,12 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.rocksdb.ColumnFamilyHandle;
 
-import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -87,7 +84,7 @@ class RocksDBReducingState<K, N, V>
 	}
 
 	@Override
-	public V get() throws IOException {
+	public V get() {
 		return getInternal();
 	}
 
@@ -100,7 +97,7 @@ class RocksDBReducingState<K, N, V>
 	}
 
 	@Override
-	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+	public void mergeNamespaces(N target, Collection<N> sources) {
 		if (sources == null || sources.isEmpty()) {
 			return;
 		}
@@ -116,17 +113,15 @@ class RocksDBReducingState<K, N, V>
 			for (N source : sources) {
 				if (source != null) {
 
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
+					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] sourceKey = dataOutputView.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						V value = valueSerializer.deserialize(
-								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+						dataInputView.setData(valueBytes);
+						V value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
 							current = reduceFunction.reduce(current, value);
@@ -141,27 +136,25 @@ class RocksDBReducingState<K, N, V>
 			// if something came out of merging the sources, merge it or write it to the target
 			if (current != null) {
 				// create the target full-binary-key
-				writeKeyWithGroupAndNamespace(
-						keyGroup, key, target,
-						keySerializationStream, keySerializationDataOutputView);
+				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetKey = dataOutputView.toByteArray();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
+					dataInputView.setData(targetValueBytes);
 					// target also had a value, merge
-					V value = valueSerializer.deserialize(
-							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+					V value = valueSerializer.deserialize(dataInputView);
 
 					current = reduceFunction.reduce(current, value);
 				}
 
 				// serialize the resulting value
-				keySerializationStream.reset();
-				valueSerializer.serialize(current, keySerializationDataOutputView);
+				dataOutputView.reset();
+				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 5ae894e..e9399e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -32,7 +30,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 /**
@@ -84,12 +81,13 @@ class RocksDBValueState<K, N, V>
 	public V value() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
+			byte[] key = dataOutputView.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return getDefaultValue();
 			}
-			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+			dataInputView.setData(valueBytes);
+			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
 		}
@@ -101,13 +99,13 @@ class RocksDBValueState<K, N, V>
 			clear();
 			return;
 		}
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			keySerializationStream.reset();
-			valueSerializer.serialize(value, out);
-			backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+			byte[] key = dataOutputView.toByteArray();
+			dataOutputView.reset();
+			valueSerializer.serialize(value, dataOutputView);
+			backend.db.put(columnFamily, writeOptions, key, dataOutputView.toByteArray());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4121cf0..483b8fd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -18,9 +18,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.TestLogger;
@@ -114,35 +112,30 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
 			int currentGroupRangeStart = currentGroupRange.getStartKeyGroup();
 			int currentGroupRangeEnd = currentGroupRange.getEndKeyGroup();
 
+			ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(32);
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					rocksDB.put(columnFamilyHandle, outputStreamWithPos.toByteArray(), String.valueOf(j).getBytes());
+					rocksDB.put(columnFamilyHandle, outputView.toByteArray(), String.valueOf(j).getBytes());
 				}
 			}
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					byte[] value = rocksDB.get(columnFamilyHandle, outputStreamWithPos.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
 					Assert.assertEquals(String.valueOf(j), new String(value));
 				}
 			}
@@ -155,19 +148,15 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
 				keyGroupPrefixBytes);
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
-				ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
 				for (int j = 0; j < 100; ++j) {
-					outputStreamWithPos.reset();
+					outputView.reset();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
-						outputStreamWithPos,
-						new DataOutputViewStreamWrapper(outputStreamWithPos),
+						outputView,
 						false);
-					byte[] value = rocksDB.get(
-						columnFamilyHandle, outputStreamWithPos.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
 					if (targetGroupRange.contains(i)) {
 						Assert.assertEquals(String.valueOf(j), new String(value));
 					} else {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
index b1737ed..d92bef5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -61,39 +63,39 @@ public class RocksDBKeySerializationUtilsTest {
 
 	@Test
 	public void testKeySerializationAndDeserialization() throws Exception {
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
-		DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
 
 		// test for key
 		for (int orgKey = 0; orgKey < 100; ++orgKey) {
-			outputStream.reset();
-			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, false);
-			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+			outputView.reset();
+			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, false);
+			inputView.setData(outputView.toByteArray());
+			int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgKey, deserializedKey);
 
-			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, true);
-			inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
+			inputView.setData(outputView.toByteArray());
+			deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgKey, deserializedKey);
 		}
 	}
 
 	@Test
 	public void testNamespaceSerializationAndDeserialization() throws Exception {
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
-		DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
 
 		for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
-			outputStream.reset();
-			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, false);
-			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+			outputView.reset();
+			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, false);
+			inputView.setData(outputView.toByteArray());
+			int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 
-			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, true);
-			inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
-			deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, true);
+			inputView.setData(outputView.toByteArray());
+			deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 		}
 	}