You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/22 11:35:46 UTC

[GitHub] asfgit closed pull request #6583: [FLINK-10175] Fix concurrent access to shared buffer between RocksDBMapState and querable state

asfgit closed pull request #6583: [FLINK-10175] Fix concurrent access to shared buffer between RocksDBMapState and querable state
URL: https://github.com/apache/flink/pull/6583
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 3be5779ceec..cc4a54bfeef 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -101,11 +101,11 @@ public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> va
 		V value = null;
 
 		if (messageKey != null) {
-			inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
+			inputDeserializer.setBuffer(messageKey);
 			key = keySerializer.deserialize(inputDeserializer);
 		}
 		if (message != null) {
-			inputDeserializer.setBuffer(message, 0, message.length);
+			inputDeserializer.setBuffer(message);
 			value = valueSerializer.deserialize(inputDeserializer);
 		}
 		return new Tuple2<>(key, value);
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
index 78da3fadc3e..6be265a5152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
@@ -81,9 +81,9 @@ public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, TypeSeria
 	@Override
 	public T deserialize(byte[] message) {
 		if (dis != null) {
-			dis.setBuffer(message, 0, message.length);
+			dis.setBuffer(message);
 		} else {
-			dis = new DataInputDeserializer(message, 0, message.length);
+			dis = new DataInputDeserializer(message);
 		}
 
 		try {
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
deleted file mode 100644
index 698a9f97dc0..00000000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import javax.annotation.Nonnull;
-
-/**
- * Reusable adapter to {@link DataInputView} that operates on given byte-arrays.
- */
-public class ByteArrayDataInputView extends DataInputViewStreamWrapper {
-
-	@Nonnull
-	private final ByteArrayInputStreamWithPos inStreamWithPos;
-
-	public ByteArrayDataInputView() {
-		super(new ByteArrayInputStreamWithPos());
-		this.inStreamWithPos = (ByteArrayInputStreamWithPos) in;
-	}
-
-	public ByteArrayDataInputView(@Nonnull byte[] buffer) {
-		this(buffer, 0, buffer.length);
-	}
-
-	public ByteArrayDataInputView(@Nonnull byte[] buffer, int offset, int length) {
-		this();
-		setData(buffer, offset, length);
-	}
-
-	public int getPosition() {
-		return inStreamWithPos.getPosition();
-	}
-
-	public void setPosition(int pos) {
-		inStreamWithPos.setPosition(pos);
-	}
-
-	public void setData(@Nonnull byte[] buffer, int offset, int length) {
-		inStreamWithPos.setBuffer(buffer, offset, length);
-	}
-
-	public void setData(@Nonnull byte[] buffer) {
-		setData(buffer, 0, buffer.length);
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java
deleted file mode 100644
index a96f3d3fef1..00000000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import javax.annotation.Nonnull;
-
-/**
- * Adapter to {@link DataOutputView} that operates on a byte-array and offers read/write access to the current position.
- */
-public class ByteArrayDataOutputView extends DataOutputViewStreamWrapper {
-
-	@Nonnull
-	private final ByteArrayOutputStreamWithPos outputStreamWithPos;
-
-	public ByteArrayDataOutputView() {
-		this(64);
-	}
-
-	public ByteArrayDataOutputView(int initialSize) {
-		super(new ByteArrayOutputStreamWithPos(initialSize));
-		this.outputStreamWithPos = (ByteArrayOutputStreamWithPos) out;
-	}
-
-	public void reset() {
-		outputStreamWithPos.reset();
-	}
-
-	@Nonnull
-	public byte[] toByteArray() {
-		return outputStreamWithPos.toByteArray();
-	}
-
-	public int getPosition() {
-		return outputStreamWithPos.getPosition();
-	}
-
-	public void setPosition(int position) {
-		outputStreamWithPos.setPosition(position);
-	}
-
-	@Nonnull
-	public byte[] getInternalBufferReference() {
-		return outputStreamWithPos.getBuf();
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
index 11973e836c9..ffdd828e77d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.core.memory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
@@ -29,6 +32,7 @@
  */
 public class DataInputDeserializer implements DataInputView, java.io.Serializable {
 
+	private static final byte[] EMPTY = new byte[0];
 	private static final long serialVersionUID = 1L;
 
 	// ------------------------------------------------------------------------
@@ -41,17 +45,19 @@
 
 	// ------------------------------------------------------------------------
 
-	public DataInputDeserializer() {}
+	public DataInputDeserializer() {
+		setBuffer(EMPTY);
+	}
 
-	public DataInputDeserializer(byte[] buffer) {
-		setBuffer(buffer, 0, buffer.length);
+	public DataInputDeserializer(@Nonnull byte[] buffer) {
+		setBufferInternal(buffer, 0, buffer.length);
 	}
 
-	public DataInputDeserializer(byte[] buffer, int start, int len) {
+	public DataInputDeserializer(@Nonnull byte[] buffer, int start, int len) {
 		setBuffer(buffer, start, len);
 	}
 
-	public DataInputDeserializer(ByteBuffer buffer) {
+	public DataInputDeserializer(@Nonnull ByteBuffer buffer) {
 		setBuffer(buffer);
 	}
 
@@ -59,7 +65,7 @@ public DataInputDeserializer(ByteBuffer buffer) {
 	//  Changing buffers
 	// ------------------------------------------------------------------------
 
-	public void setBuffer(ByteBuffer buffer) {
+	public void setBuffer(@Nonnull ByteBuffer buffer) {
 		if (buffer.hasArray()) {
 			this.buffer = buffer.array();
 			this.position = buffer.arrayOffset() + buffer.position();
@@ -76,15 +82,20 @@ public void setBuffer(ByteBuffer buffer) {
 		}
 	}
 
-	public void setBuffer(byte[] buffer, int start, int len) {
-		if (buffer == null) {
-			throw new NullPointerException();
-		}
+	public void setBuffer(@Nonnull byte[] buffer, int start, int len) {
 
 		if (start < 0 || len < 0 || start + len > buffer.length) {
-			throw new IllegalArgumentException();
+			throw new IllegalArgumentException("Invalid bounds.");
 		}
 
+		setBufferInternal(buffer, start, len);
+	}
+
+	public void setBuffer(@Nonnull byte[] buffer) {
+		setBufferInternal(buffer, 0, buffer.length);
+	}
+
+	private void setBufferInternal(@Nonnull byte[] buffer, int start, int len) {
 		this.buffer = buffer;
 		this.position = start;
 		this.end = start + len;
@@ -144,12 +155,12 @@ public float readFloat() throws IOException {
 	}
 
 	@Override
-	public void readFully(byte[] b) throws IOException {
+	public void readFully(@Nonnull byte[] b) throws IOException {
 		readFully(b, 0, b.length);
 	}
 
 	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
+	public void readFully(@Nonnull byte[] b, int off, int len) throws IOException {
 		if (len >= 0) {
 			if (off <= b.length - len) {
 				if (this.position <= this.end - len) {
@@ -161,7 +172,7 @@ public void readFully(byte[] b, int off, int len) throws IOException {
 			} else {
 				throw new ArrayIndexOutOfBoundsException();
 			}
-		} else if (len < 0) {
+		} else {
 			throw new IllegalArgumentException("Length may not be negative.");
 		}
 	}
@@ -182,6 +193,7 @@ public int readInt() throws IOException {
 		}
 	}
 
+	@Nullable
 	@Override
 	public String readLine() throws IOException {
 		if (this.position < this.end) {
@@ -229,6 +241,7 @@ public short readShort() throws IOException {
 		}
 	}
 
+	@Nonnull
 	@Override
 	public String readUTF() throws IOException {
 		int utflen = readUnsignedShort();
@@ -319,7 +332,7 @@ public int readUnsignedShort() throws IOException {
 	}
 
 	@Override
-	public int skipBytes(int n) throws IOException {
+	public int skipBytes(int n) {
 		if (this.position <= this.end - n) {
 			this.position += n;
 			return n;
@@ -340,10 +353,7 @@ public void skipBytesToRead(int numBytes) throws IOException {
 	}
 
 	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		if (b == null){
-			throw new NullPointerException("Byte array b cannot be null.");
-		}
+	public int read(@Nonnull byte[] b, int off, int len) throws IOException {
 
 		if (off < 0){
 			throw new IndexOutOfBoundsException("Offset cannot be negative.");
@@ -370,10 +380,14 @@ public int read(byte[] b, int off, int len) throws IOException {
 	}
 
 	@Override
-	public int read(byte[] b) throws IOException {
+	public int read(@Nonnull byte[] b) throws IOException {
 		return read(b, 0, b.length);
 	}
 
+	public int getPosition() {
+		return position;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
index 1ee05cd0392..e3706ec2944 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -34,12 +34,12 @@
 abstract class TtlStateSnapshotTransformer<T> implements CollectionStateSnapshotTransformer<T> {
 	private final TtlTimeProvider ttlTimeProvider;
 	final long ttl;
-	private final ByteArrayDataInputView div;
+	private final DataInputDeserializer div;
 
 	TtlStateSnapshotTransformer(@Nonnull TtlTimeProvider ttlTimeProvider, long ttl) {
 		this.ttlTimeProvider = ttlTimeProvider;
 		this.ttl = ttl;
-		this.div = new ByteArrayDataInputView();
+		this.div = new DataInputDeserializer();
 	}
 
 	<V> TtlValue<V> filterTtlValue(TtlValue<V> value) {
@@ -55,7 +55,7 @@ boolean expired(long ts) {
 	}
 
 	long deserializeTs(byte[] value) throws IOException {
-		div.setData(value, 0, Long.BYTES);
+		div.setBuffer(value, 0, Long.BYTES);
 		return LongSerializer.INSTANCE.deserialize(div);
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 2a9ab7589a9..8c0f4d7da7b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -61,7 +61,7 @@ SV getInternal(byte[] key) {
 			if (valueBytes == null) {
 				return null;
 			}
-			dataInputView.setData(valueBytes);
+			dataInputView.setBuffer(valueBytes);
 			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 65b7f1fa4a7..8b8fbb23a99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,8 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -66,9 +66,9 @@
 
 	protected final WriteOptions writeOptions;
 
-	protected final ByteArrayDataOutputView dataOutputView;
+	protected final DataOutputSerializer dataOutputView;
 
-	protected final ByteArrayDataInputView dataInputView;
+	protected final DataInputDeserializer dataInputView;
 
 	private final boolean ambiguousKeyPossible;
 
@@ -97,8 +97,8 @@ protected AbstractRocksDBState(
 		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
 		this.defaultValue = defaultValue;
 
-		this.dataOutputView = new ByteArrayDataOutputView(128);
-		this.dataInputView = new ByteArrayDataInputView();
+		this.dataOutputView = new DataOutputSerializer(128);
+		this.dataInputView = new DataInputDeserializer();
 		this.ambiguousKeyPossible =
 			RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
 	}
@@ -109,7 +109,7 @@ protected AbstractRocksDBState(
 	public void clear() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.toByteArray();
+			byte[] key = dataOutputView.getCopyOfBuffer();
 			backend.db.delete(columnFamily, writeOptions, key);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
@@ -141,7 +141,7 @@ public void setCurrentNamespace(N namespace) {
 
 		// we cannot reuse the keySerializationStream member since this method
 		// is called concurrently to the other ones and it may thus contain garbage
-		ByteArrayDataOutputView tmpKeySerializationView = new ByteArrayDataOutputView(128);
+		DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128);
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -151,13 +151,13 @@ public void setCurrentNamespace(N namespace) {
 				safeNamespaceSerializer,
 				tmpKeySerializationView);
 
-		return backend.db.get(columnFamily, tmpKeySerializationView.toByteArray());
+		return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
 	}
 
 	byte[] getKeyBytes() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			return dataOutputView.toByteArray();
+			return dataOutputView.getCopyOfBuffer();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing key", e);
 		}
@@ -165,9 +165,9 @@ public void setCurrentNamespace(N namespace) {
 
 	byte[] getValueBytes(V value) {
 		try {
-			dataOutputView.reset();
+			dataOutputView.clear();
 			valueSerializer.serialize(value, dataOutputView);
-			return dataOutputView.toByteArray();
+			return dataOutputView.getCopyOfBuffer();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing value", e);
 		}
@@ -183,7 +183,7 @@ protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
 
 	protected void writeKeyWithGroupAndNamespace(
 			int keyGroup, K key, N namespace,
-			ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
+			DataOutputSerializer keySerializationDataOutputView) throws IOException {
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -200,13 +200,13 @@ protected void writeKeyWithGroupAndNamespace(
 			final TypeSerializer<K> keySerializer,
 			final N namespace,
 			final TypeSerializer<N> namespaceSerializer,
-			final ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
+			final DataOutputSerializer keySerializationDataOutputView) throws IOException {
 
 		Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
 		Preconditions.checkNotNull(keySerializer);
 		Preconditions.checkNotNull(namespaceSerializer);
 
-		keySerializationDataOutputView.reset();
+		keySerializationDataOutputView.clear();
 		RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
 		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
 		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 4f9ef2f811c..2085fb86256 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -121,12 +121,12 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 				if (source != null) {
 					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = dataOutputView.toByteArray();
+					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						dataInputView.setData(valueBytes);
+						dataInputView.setBuffer(valueBytes);
 						ACC value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
@@ -144,23 +144,23 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 				// create the target full-binary-key
 				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = dataOutputView.toByteArray();
+				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
 					// target also had a value, merge
-					dataInputView.setData(targetValueBytes);
+					dataInputView.setBuffer(targetValueBytes);
 					ACC value = valueSerializer.deserialize(dataInputView);
 
 					current = aggFunction.merge(current, value);
 				}
 
 				// serialize the resulting value
-				dataOutputView.reset();
+				dataOutputView.clear();
 				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index 68b5b5fdee3..364185a2f3e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -19,8 +19,8 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.util.CloseableIterator;
@@ -84,11 +84,11 @@
 
 	/** Output view that helps to serialize elements. */
 	@Nonnull
-	private final ByteArrayDataOutputView outputView;
+	private final DataOutputSerializer outputView;
 
 	/** Input view that helps to de-serialize elements. */
 	@Nonnull
-	private final ByteArrayDataInputView inputView;
+	private final DataInputDeserializer inputView;
 
 	/** In memory cache that holds a head-subset of the elements stored in RocksDB. */
 	@Nonnull
@@ -114,8 +114,8 @@
 		@Nonnull RocksDB db,
 		@Nonnull ColumnFamilyHandle columnFamilyHandle,
 		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
-		@Nonnull ByteArrayDataOutputView outputStream,
-		@Nonnull ByteArrayDataInputView inputStream,
+		@Nonnull DataOutputSerializer outputStream,
+		@Nonnull DataInputDeserializer inputStream,
 		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
 		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
 		this.db = db;
@@ -357,7 +357,7 @@ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
 	@Nonnull
 	private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
 
-		outputView.reset();
+		outputView.clear();
 
 		try {
 			RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
@@ -365,16 +365,16 @@ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
 			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
 		}
 
-		return outputView.toByteArray();
+		return outputView.getCopyOfBuffer();
 	}
 
 	@Nonnull
 	private byte[] serializeElement(@Nonnull E element) {
 		try {
-			outputView.reset();
+			outputView.clear();
 			outputView.write(groupPrefixBytes);
 			byteOrderProducingSerializer.serialize(element, outputView);
-			return outputView.toByteArray();
+			return outputView.getCopyOfBuffer();
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while serializing the element.", e);
 		}
@@ -383,7 +383,8 @@ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
 	@Nonnull
 	private E deserializeElement(@Nonnull byte[] bytes) {
 		try {
-			inputView.setData(bytes, groupPrefixBytes.length, bytes.length);
+			final int numPrefixBytes = groupPrefixBytes.length;
+			inputView.setBuffer(bytes, numPrefixBytes, bytes.length - numPrefixBytes);
 			return byteOrderProducingSerializer.deserialize(inputView);
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Error while deserializing the element.", e);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 7c9e3f8c3f0..d8844bfece1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -18,9 +18,9 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
@@ -41,7 +41,7 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 
 	public static <K> K readKey(
 		TypeSerializer<K> keySerializer,
-		ByteArrayDataInputView inputView,
+		DataInputDeserializer inputView,
 		boolean ambiguousKeyPossible) throws IOException {
 		int beforeRead = inputView.getPosition();
 		K key = keySerializer.deserialize(inputView);
@@ -54,7 +54,7 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 
 	public static <N> N readNamespace(
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayDataInputView inputView,
+		DataInputDeserializer inputView,
 		boolean ambiguousKeyPossible) throws IOException {
 		int beforeRead = inputView.getPosition();
 		N namespace = namespaceSerializer.deserialize(inputView);
@@ -68,10 +68,10 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws
 	public static <N> void writeNameSpace(
 		N namespace,
 		TypeSerializer<N> namespaceSerializer,
-		ByteArrayDataOutputView keySerializationDataOutputView,
+		DataOutputSerializer keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 
-		int beforeWrite = keySerializationDataOutputView.getPosition();
+		int beforeWrite = keySerializationDataOutputView.length();
 		namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
@@ -96,10 +96,10 @@ public static void writeKeyGroup(
 	public static <K> void writeKey(
 		K key,
 		TypeSerializer<K> keySerializer,
-		ByteArrayDataOutputView keySerializationDataOutputView,
+		DataOutputSerializer keySerializationDataOutputView,
 		boolean ambiguousKeyPossible) throws IOException {
 		//write key
-		int beforeWrite = keySerializationDataOutputView.getPosition();
+		int beforeWrite = keySerializationDataOutputView.length();
 		keySerializer.serialize(key, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
@@ -117,8 +117,8 @@ private static void readVariableIntBytes(DataInputView inputView, int value) thr
 
 	private static void writeLengthFrom(
 		int fromPosition,
-		ByteArrayDataOutputView keySerializationDateDataOutputView) throws IOException {
-		int length = keySerializationDateDataOutputView.getPosition() - fromPosition;
+		DataOutputSerializer keySerializationDateDataOutputView) throws IOException {
+		int length = keySerializationDateDataOutputView.length() - fromPosition;
 		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 60baaedae75..42a1e26b8b5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,10 +43,10 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -328,7 +328,7 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
 
 		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
-		final ByteArrayDataOutputView namespaceOutputView = new ByteArrayDataOutputView(8);
+		final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
 		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
 		try {
@@ -337,7 +337,7 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 				namespaceSerializer,
 				namespaceOutputView,
 				ambiguousKeyPossible);
-			nameSpaceBytes = namespaceOutputView.toByteArray();
+			nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
 		} catch (IOException ex) {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
@@ -1501,15 +1501,15 @@ public static RocksIteratorWrapper getRocksIterator(
 
 		/** A shared buffer to serialize elements for the priority queue. */
 		@Nonnull
-		private final ByteArrayDataOutputView sharedElementOutView;
+		private final DataOutputSerializer sharedElementOutView;
 
 		/** A shared buffer to de-serialize elements for the priority queue. */
 		@Nonnull
-		private final ByteArrayDataInputView sharedElementInView;
+		private final DataInputDeserializer sharedElementInView;
 
 		RocksDBPriorityQueueSetFactory() {
-			this.sharedElementOutView = new ByteArrayDataOutputView();
-			this.sharedElementInView = new ByteArrayDataInputView();
+			this.sharedElementOutView = new DataOutputSerializer(128);
+			this.sharedElementInView = new DataInputDeserializer();
 		}
 
 		@Nonnull
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index cdd7afb7d9a..f70c6a57bad 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -24,9 +24,8 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -115,7 +114,7 @@ private RocksDBListState(
 	public List<V> getInternal() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.toByteArray();
+			byte[] key = dataOutputView.getCopyOfBuffer();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			return deserializeList(valueBytes);
 		} catch (IOException | RocksDBException e) {
@@ -129,7 +128,7 @@ private RocksDBListState(
 			return null;
 		}
 
-		dataInputView.setData(valueBytes);
+		dataInputView.setBuffer(valueBytes);
 
 		List<V> result = new ArrayList<>();
 		V next;
@@ -139,7 +138,7 @@ private RocksDBListState(
 		return result;
 	}
 
-	private static <V> V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
+	private static <V> V deserializeNextElement(DataInputDeserializer in, TypeSerializer<V> elementSerializer) {
 		try {
 			if (in.available() > 0) {
 				V element = elementSerializer.deserialize(in);
@@ -160,10 +159,10 @@ public void add(V value) {
 
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.toByteArray();
-			dataOutputView.reset();
+			byte[] key = dataOutputView.getCopyOfBuffer();
+			dataOutputView.clear();
 			elementSerializer.serialize(value, dataOutputView);
-			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.toByteArray());
+			backend.db.merge(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -182,14 +181,14 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 		try {
 			// create the target full-binary-key
 			writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-			final byte[] targetKey = dataOutputView.toByteArray();
+			final byte[] targetKey = dataOutputView.getCopyOfBuffer();
 
 			// merge the sources to the target
 			for (N source : sources) {
 				if (source != null) {
 					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					byte[] sourceKey = dataOutputView.toByteArray();
+					byte[] sourceKey = dataOutputView.getCopyOfBuffer();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
@@ -218,7 +217,7 @@ public void updateInternal(List<V> values) {
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.toByteArray();
+				byte[] key = dataOutputView.getCopyOfBuffer();
 				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
 				backend.db.put(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
@@ -234,7 +233,7 @@ public void addAll(List<V> values) {
 		if (!values.isEmpty()) {
 			try {
 				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = dataOutputView.toByteArray();
+				byte[] key = dataOutputView.getCopyOfBuffer();
 				byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
 				backend.db.merge(columnFamily, writeOptions, key, premerge);
 			} catch (IOException | RocksDBException e) {
@@ -246,9 +245,9 @@ public void addAll(List<V> values) {
 	private static <V> byte[] getPreMergedValue(
 		List<V> values,
 		TypeSerializer<V> elementSerializer,
-		ByteArrayDataOutputView keySerializationStream) throws IOException {
+		DataOutputSerializer keySerializationStream) throws IOException {
 
-		keySerializationStream.reset();
+		keySerializationStream.clear();
 		boolean first = true;
 		for (V value : values) {
 			Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
@@ -260,7 +259,7 @@ public void addAll(List<V> values) {
 			elementSerializer.serialize(value, keySerializationStream);
 		}
 
-		return keySerializationStream.toByteArray();
+		return keySerializationStream.getCopyOfBuffer();
 	}
 
 	@SuppressWarnings("unchecked")
@@ -280,7 +279,7 @@ public void addAll(List<V> values) {
 	static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
 		private final StateSnapshotTransformer<T> elementTransformer;
 		private final TypeSerializer<T> elementSerializer;
-		private final ByteArrayDataOutputView out = new ByteArrayDataOutputView(128);
+		private final DataOutputSerializer out = new DataOutputSerializer(128);
 		private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
 
 		StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
@@ -298,7 +297,7 @@ public void addAll(List<V> values) {
 				return null;
 			}
 			List<T> result = new ArrayList<>();
-			ByteArrayDataInputView in = new ByteArrayDataInputView(value);
+			DataInputDeserializer in = new DataInputDeserializer(value);
 			T next;
 			int prevPosition = 0;
 			while ((next = deserializeNextElement(in, elementSerializer)) != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index ad6b7c22ec4..5c9f7f9f30c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -24,8 +24,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
@@ -122,14 +122,14 @@ public UV get(UK userKey) throws IOException, RocksDBException {
 		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
 		byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
 
-		return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
+		return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
 	}
 
 	@Override
 	public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
 
 		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-		byte[] rawValueBytes = serializeUserValue(userValue);
+		byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView);
 
 		backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 	}
@@ -143,7 +143,7 @@ public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
 		try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
 			for (Map.Entry<UK, UV> entry : map.entrySet()) {
 				byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
-				byte[] rawValueBytes = serializeUserValue(entry.getValue());
+				byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
 				writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
 			}
 		}
@@ -180,7 +180,7 @@ public boolean contains(UK userKey) throws IOException, RocksDBException {
 	public Iterable<UK> keys() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
+		return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
 			public UK next() {
 				RocksDBMapEntry entry = nextEntry();
@@ -193,7 +193,7 @@ public UK next() {
 	public Iterable<UV> values() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
+		return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
 			public UV next() {
 				RocksDBMapEntry entry = nextEntry();
@@ -206,7 +206,7 @@ public UV next() {
 	public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
 		final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
 
-		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
+		return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
 			@Override
 			public Map.Entry<UK, UV> next() {
 				return nextEntry();
@@ -259,7 +259,8 @@ public void clear() {
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-		ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128);
+		DataOutputSerializer outputView = new DataOutputSerializer(128);
+		DataInputDeserializer inputView = new DataInputDeserializer();
 
 		writeKeyWithGroupAndNamespace(
 				keyGroup,
@@ -269,7 +270,7 @@ public void clear() {
 				safeNamespaceSerializer,
 				outputView);
 
-		final byte[] keyPrefixBytes = outputView.toByteArray();
+		final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
 
 		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
@@ -280,7 +281,8 @@ public void clear() {
 				backend.db,
 				keyPrefixBytes,
 				dupUserKeySerializer,
-				dupUserValueSerializer) {
+				dupUserValueSerializer,
+				inputView) {
 
 			@Override
 			public Map.Entry<UK, UV> next() {
@@ -303,26 +305,22 @@ public void clear() {
 	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
 
-		return dataOutputView.toByteArray();
+		return dataOutputView.getCopyOfBuffer();
 	}
 
 	private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
 		serializeCurrentKeyAndNamespace();
 		userKeySerializer.serialize(userKey, dataOutputView);
 
-		return dataOutputView.toByteArray();
+		return dataOutputView.getCopyOfBuffer();
 	}
 
-	private byte[] serializeUserValue(UV userValue) throws IOException {
-		return serializeUserValue(userValue, userValueSerializer);
-	}
-
-	private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
-		return deserializeUserValue(rawValueBytes, userValueSerializer);
-	}
+	private static <UV> byte[] serializeUserValue(
+		UV userValue,
+		TypeSerializer<UV> valueSerializer,
+		DataOutputSerializer dataOutputView) throws IOException {
 
-	private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
-		dataOutputView.reset();
+		dataOutputView.clear();
 
 		if (userValue == null) {
 			dataOutputView.writeBoolean(true);
@@ -331,16 +329,24 @@ private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
 			valueSerializer.serialize(userValue, dataOutputView);
 		}
 
-		return dataOutputView.toByteArray();
+		return dataOutputView.getCopyOfBuffer();
 	}
 
-	private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
-		dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
+	private static <UK> UK deserializeUserKey(
+		DataInputDeserializer dataInputView,
+		int userKeyOffset,
+		byte[] rawKeyBytes,
+		TypeSerializer<UK> keySerializer) throws IOException {
+		dataInputView.setBuffer(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
 		return keySerializer.deserialize(dataInputView);
 	}
 
-	private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
-		dataInputView.setData(rawValueBytes);
+	private static <UV> UV deserializeUserValue(
+		DataInputDeserializer dataInputView,
+		byte[] rawValueBytes,
+		TypeSerializer<UV> valueSerializer) throws IOException {
+
+		dataInputView.setBuffer(rawValueBytes);
 
 		boolean isNull = dataInputView.readBoolean();
 
@@ -388,9 +394,12 @@ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
 		/** The offset of User Key offset in raw key bytes. */
 		private final int userKeyOffset;
 
-		private TypeSerializer<UK> keySerializer;
+		private final TypeSerializer<UK> keySerializer;
+
+		private final TypeSerializer<UV> valueSerializer;
 
-		private TypeSerializer<UV> valueSerializer;
+		private final DataInputDeserializer dataInputView;
+		private final DataOutputSerializer dataOutputView;
 
 		RocksDBMapEntry(
 				@Nonnull final RocksDB db,
@@ -398,7 +407,9 @@ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
 				@Nonnull final byte[] rawKeyBytes,
 				@Nonnull final byte[] rawValueBytes,
 				@Nonnull final TypeSerializer<UK> keySerializer,
-				@Nonnull final TypeSerializer<UV> valueSerializer) {
+				@Nonnull final TypeSerializer<UV> valueSerializer,
+				@Nonnull DataInputDeserializer dataInputView,
+				@Nonnull DataOutputSerializer dataOutputView) {
 			this.db = db;
 
 			this.userKeyOffset = userKeyOffset;
@@ -408,6 +419,8 @@ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
 			this.rawKeyBytes = rawKeyBytes;
 			this.rawValueBytes = rawValueBytes;
 			this.deleted = false;
+			this.dataInputView = dataInputView;
+			this.dataOutputView = dataOutputView;
 		}
 
 		public void remove() {
@@ -425,7 +438,7 @@ public void remove() {
 		public UK getKey() {
 			if (userKey == null) {
 				try {
-					userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer);
+					userKey = deserializeUserKey(dataInputView, userKeyOffset, rawKeyBytes, keySerializer);
 				} catch (IOException e) {
 					throw new FlinkRuntimeException("Error while deserializing the user key.", e);
 				}
@@ -441,7 +454,7 @@ public UV getValue() {
 			} else {
 				if (userValue == null) {
 					try {
-						userValue = deserializeUserValue(rawValueBytes, valueSerializer);
+						userValue = deserializeUserValue(dataInputView, rawValueBytes, valueSerializer);
 					} catch (IOException e) {
 						throw new FlinkRuntimeException("Error while deserializing the user value.", e);
 					}
@@ -461,7 +474,7 @@ public UV setValue(UV value) {
 
 			try {
 				userValue = value;
-				rawValueBytes = serializeUserValue(value, valueSerializer);
+				rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView);
 
 				db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
 			} catch (IOException | RocksDBException e) {
@@ -499,17 +512,20 @@ public UV setValue(UV value) {
 
 		private final TypeSerializer<UK> keySerializer;
 		private final TypeSerializer<UV> valueSerializer;
+		private final DataInputDeserializer dataInputView;
 
 		RocksDBMapIterator(
-				final RocksDB db,
-				final byte[] keyPrefixBytes,
-				final TypeSerializer<UK> keySerializer,
-				final TypeSerializer<UV> valueSerializer) {
+			final RocksDB db,
+			final byte[] keyPrefixBytes,
+			final TypeSerializer<UK> keySerializer,
+			final TypeSerializer<UV> valueSerializer,
+			DataInputDeserializer dataInputView) {
 
 			this.db = db;
 			this.keyPrefixBytes = keyPrefixBytes;
 			this.keySerializer = keySerializer;
 			this.valueSerializer = valueSerializer;
+			this.dataInputView = dataInputView;
 		}
 
 		@Override
@@ -597,7 +613,9 @@ private void loadCache() {
 						iterator.key(),
 						iterator.value(),
 						keySerializer,
-						valueSerializer);
+						valueSerializer,
+						dataInputView,
+						dataOutputView);
 
 					cacheEntries.add(entry);
 
@@ -630,24 +648,24 @@ private void loadCache() {
 		private static final byte[] NULL_VALUE;
 		private static final byte NON_NULL_VALUE_PREFIX;
 		static {
-			ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1);
+			DataOutputSerializer dov = new DataOutputSerializer(1);
 			try {
 				dov.writeBoolean(true);
-				NULL_VALUE = dov.toByteArray();
-				dov.reset();
+				NULL_VALUE = dov.getCopyOfBuffer();
+				dov.clear();
 				dov.writeBoolean(false);
-				NON_NULL_VALUE_PREFIX = dov.toByteArray()[0];
+				NON_NULL_VALUE_PREFIX = dov.getSharedBuffer()[0];
 			} catch (IOException e) {
 				throw new FlinkRuntimeException("Failed to serialize boolean flag of map user null value", e);
 			}
 		}
 
 		private final StateSnapshotTransformer<byte[]> elementTransformer;
-		private final ByteArrayDataInputView div;
+		private final DataInputDeserializer div;
 
 		StateSnapshotTransformerWrapper(StateSnapshotTransformer<byte[]> originalTransformer) {
 			this.elementTransformer = originalTransformer;
-			this.div = new ByteArrayDataInputView();
+			this.div = new DataInputDeserializer();
 		}
 
 		@Override
@@ -674,7 +692,7 @@ private void loadCache() {
 
 		private boolean isNull(byte[] value) {
 			try {
-				div.setData(value, 0, 1);
+				div.setBuffer(value, 0, 1);
 				return div.readBoolean();
 			} catch (IOException e) {
 				throw new FlinkRuntimeException("Failed to deserialize boolean flag of map user null value", e);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d1fe3bd3798..138357b0d77 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -115,12 +115,12 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 
 					writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
 
-					final byte[] sourceKey = dataOutputView.toByteArray();
+					final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
 					backend.db.delete(columnFamily, writeOptions, sourceKey);
 
 					if (valueBytes != null) {
-						dataInputView.setData(valueBytes);
+						dataInputView.setBuffer(valueBytes);
 						V value = valueSerializer.deserialize(dataInputView);
 
 						if (current != null) {
@@ -138,11 +138,11 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 				// create the target full-binary-key
 				writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
 
-				final byte[] targetKey = dataOutputView.toByteArray();
+				final byte[] targetKey = dataOutputView.getCopyOfBuffer();
 				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
 
 				if (targetValueBytes != null) {
-					dataInputView.setData(targetValueBytes);
+					dataInputView.setBuffer(targetValueBytes);
 					// target also had a value, merge
 					V value = valueSerializer.deserialize(dataInputView);
 
@@ -150,11 +150,11 @@ public void mergeNamespaces(N target, Collection<N> sources) {
 				}
 
 				// serialize the resulting value
-				dataOutputView.reset();
+				dataOutputView.clear();
 				valueSerializer.serialize(current, dataOutputView);
 
 				// write the resulting value
-				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
+				backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer());
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index e9399e12a32..0ca90d4a521 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -81,12 +81,12 @@ private RocksDBValueState(
 	public V value() {
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.toByteArray();
+			byte[] key = dataOutputView.getCopyOfBuffer();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return getDefaultValue();
 			}
-			dataInputView.setData(valueBytes);
+			dataInputView.setBuffer(valueBytes);
 			return valueSerializer.deserialize(dataInputView);
 		} catch (IOException | RocksDBException e) {
 			throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
@@ -102,10 +102,10 @@ public void update(V value) {
 
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = dataOutputView.toByteArray();
-			dataOutputView.reset();
+			byte[] key = dataOutputView.getCopyOfBuffer();
+			dataOutputView.clear();
 			valueSerializer.serialize(value, dataOutputView);
-			backend.db.put(columnFamily, writeOptions, key, dataOutputView.toByteArray());
+			backend.db.put(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
 		} catch (Exception e) {
 			throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
index 0fa93dc8a1f..4f79d870d87 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
@@ -21,7 +21,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nonnull;
@@ -53,7 +53,7 @@
 
 	private final boolean ambiguousKeyPossible;
 	private final int keyGroupPrefixBytes;
-	private final ByteArrayDataInputView byteArrayDataInputView;
+	private final DataInputDeserializer byteArrayDataInputView;
 	private K nextKey;
 	private K previousKey;
 
@@ -72,7 +72,7 @@ public RocksStateKeysIterator(
 		this.nextKey = null;
 		this.previousKey = null;
 		this.ambiguousKeyPossible = ambiguousKeyPossible;
-		this.byteArrayDataInputView = new ByteArrayDataInputView();
+		this.byteArrayDataInputView = new DataInputDeserializer();
 	}
 
 	@Override
@@ -107,8 +107,8 @@ public K next() {
 		return tmpKey;
 	}
 
-	private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView readView) throws IOException {
-		readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
+	private K deserializeKey(byte[] keyBytes, DataInputDeserializer readView) throws IOException {
+		readView.setBuffer(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
 		return RocksDBKeySerializationUtils.readKey(
 			keySerializer,
 			byteArrayDataInputView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index ad8b74c975f..d402c3de574 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
@@ -52,8 +52,8 @@ protected boolean testSetSemanticsAgainstDuplicateElements() {
 		TestElement, RocksDBCachingPriorityQueueSet<TestElement>> newFactory() {
 
 		return (keyGroupId, numKeyGroups, keyExtractorFunction, elementComparator) -> {
-			ByteArrayDataOutputView outputStreamWithPos = new ByteArrayDataOutputView();
-			ByteArrayDataInputView inputStreamWithPos = new ByteArrayDataInputView();
+			DataOutputSerializer outputStreamWithPos = new DataOutputSerializer(128);
+			DataInputDeserializer inputStreamWithPos = new DataInputDeserializer();
 			int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups);
 			TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32);
 			return new RocksDBCachingPriorityQueueSet<>(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 483b8fdd1dc..942d85cf5ea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -18,7 +18,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.TestLogger;
@@ -112,30 +112,30 @@ private void testClipDBWithKeyGroupRangeHelper(
 			int currentGroupRangeStart = currentGroupRange.getStartKeyGroup();
 			int currentGroupRangeEnd = currentGroupRange.getEndKeyGroup();
 
-			ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(32);
+			DataOutputSerializer outputView = new DataOutputSerializer(32);
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
 				for (int j = 0; j < 100; ++j) {
-					outputView.reset();
+					outputView.clear();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
 						outputView,
 						false);
-					rocksDB.put(columnFamilyHandle, outputView.toByteArray(), String.valueOf(j).getBytes());
+					rocksDB.put(columnFamilyHandle, outputView.getCopyOfBuffer(), String.valueOf(j).getBytes());
 				}
 			}
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
 				for (int j = 0; j < 100; ++j) {
-					outputView.reset();
+					outputView.clear();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
 						outputView,
 						false);
-					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
 					Assert.assertEquals(String.valueOf(j), new String(value));
 				}
 			}
@@ -149,14 +149,14 @@ private void testClipDBWithKeyGroupRangeHelper(
 
 			for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
 				for (int j = 0; j < 100; ++j) {
-					outputView.reset();
+					outputView.clear();
 					RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
 					RocksDBKeySerializationUtils.writeKey(
 						j,
 						IntSerializer.INSTANCE,
 						outputView,
 						false);
-					byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
+					byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
 					if (targetGroupRange.contains(i)) {
 						Assert.assertEquals(String.valueOf(j), new String(value));
 					} else {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
index d92bef5e960..66c13a9a05f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
@@ -19,11 +19,11 @@
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
@@ -63,19 +63,19 @@ public void testKeyGroupSerializationAndDeserialization() throws Exception {
 
 	@Test
 	public void testKeySerializationAndDeserialization() throws Exception {
-		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
-		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
+		final DataOutputSerializer outputView = new DataOutputSerializer(8);
+		final DataInputDeserializer inputView = new DataInputDeserializer();
 
 		// test for key
 		for (int orgKey = 0; orgKey < 100; ++orgKey) {
-			outputView.reset();
+			outputView.clear();
 			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, false);
-			inputView.setData(outputView.toByteArray());
+			inputView.setBuffer(outputView.getCopyOfBuffer());
 			int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgKey, deserializedKey);
 
 			RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
-			inputView.setData(outputView.toByteArray());
+			inputView.setBuffer(outputView.getCopyOfBuffer());
 			deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgKey, deserializedKey);
 		}
@@ -83,18 +83,18 @@ public void testKeySerializationAndDeserialization() throws Exception {
 
 	@Test
 	public void testNamespaceSerializationAndDeserialization() throws Exception {
-		final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
-		final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
+		final DataOutputSerializer outputView = new DataOutputSerializer(8);
+		final DataInputDeserializer inputView = new DataInputDeserializer();
 
 		for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
-			outputView.reset();
+			outputView.clear();
 			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, false);
-			inputView.setData(outputView.toByteArray());
+			inputView.setBuffer(outputView.getCopyOfBuffer());
 			int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, false);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 
 			RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, true);
-			inputView.setData(outputView.toByteArray());
+			inputView.setBuffer(outputView.getCopyOfBuffer());
 			deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, true);
 			Assert.assertEquals(orgNamespace, deserializedNamepsace);
 		}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index e042ebd0609..398df3f00c4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -24,7 +24,7 @@
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
-import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -105,7 +105,7 @@ public void testIterator() throws Exception{
 				testState.update(String.valueOf(i));
 			}
 
-			ByteArrayDataOutputView outputStream = new ByteArrayDataOutputView(8);
+			DataOutputSerializer outputStream = new DataOutputSerializer(8);
 			boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 			RocksDBKeySerializationUtils.writeNameSpace(
 				namespace,
@@ -113,7 +113,7 @@ public void testIterator() throws Exception{
 				outputStream,
 				ambiguousKeyPossible);
 
-			byte[] nameSpaceBytes = outputStream.toByteArray();
+			byte[] nameSpaceBytes = outputStream.getCopyOfBuffer();
 
 			try (
 				ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
index 1a8bc58f7bc..0b082ff180e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
@@ -162,7 +162,7 @@ public int getNumIds() {
 				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
 			}
 			else {
-				deser.setBuffer(serializedData, 0, serializedData.length);
+				deser.setBuffer(serializedData);
 			}
 
 			final Set<T> ids = new HashSet<>(checkpoint.getNumIds());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services