You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/10 18:10:25 UTC
[flink] 02/02: [FLINK-10124][state] Use
ByteArrayDataInput/OutputView instead of stream + wrapper
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 18ff4ab8e55d76522f835cc683f57252b3f742bc
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Fri Aug 10 20:06:58 2018 +0200
[FLINK-10124][state] Use ByteArrayDataInput/OutputView instead of stream + wrapper
---
.../flink/core/memory/ByteArrayDataInputView.java | 4 ++
.../state/AbstractRocksDBAppendingState.java | 5 +-
.../streaming/state/AbstractRocksDBState.java | 50 +++++++---------
.../streaming/state/RocksDBAggregatingState.java | 28 ++++-----
.../state/RocksDBKeySerializationUtils.java | 39 +++++--------
.../streaming/state/RocksDBKeyedStateBackend.java | 8 +--
.../contrib/streaming/state/RocksDBListState.java | 68 ++++++++--------------
.../contrib/streaming/state/RocksDBMapState.java | 41 +++++--------
.../streaming/state/RocksDBReducingState.java | 33 +++++------
.../contrib/streaming/state/RocksDBValueState.java | 18 +++---
.../RocksDBIncrementalCheckpointUtilsTest.java | 33 ++++-------
.../state/RocksDBKeySerializationUtilsTest.java | 38 ++++++------
12 files changed, 150 insertions(+), 215 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0..698a9f9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public class ByteArrayDataInputView extends DataInputViewStreamWrapper {
public void setData(@Nonnull byte[] buffer, int offset, int length) {
inStreamWithPos.setBuffer(buffer, offset, length);
}
+
+ public void setData(@Nonnull byte[] buffer) {
+ setData(buffer, 0, buffer.length);
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f8..2a9ab75 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalAppendingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -63,7 +61,8 @@ abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends State
if (valueBytes == null) {
return null;
}
- return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+ dataInputView.setData(valueBytes);
+ return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089..65b7f1f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
protected final WriteOptions writeOptions;
- protected final ByteArrayOutputStreamWithPos keySerializationStream;
+ protected final ByteArrayDataOutputView dataOutputView;
- protected final DataOutputView keySerializationDataOutputView;
+ protected final ByteArrayDataInputView dataInputView;
private final boolean ambiguousKeyPossible;
@@ -98,9 +97,10 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer");
this.defaultValue = defaultValue;
- this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
- this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
- this.ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+ this.dataOutputView = new ByteArrayDataOutputView(128);
+ this.dataInputView = new ByteArrayDataInputView();
+ this.ambiguousKeyPossible =
+ RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
}
// ------------------------------------------------------------------------
@@ -109,7 +109,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
public void clear() {
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
+ byte[] key = dataOutputView.toByteArray();
backend.db.delete(columnFamily, writeOptions, key);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
@@ -141,8 +141,7 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
// we cannot reuse the keySerializationStream member since this method
// is called concurrently to the other ones and it may thus contain garbage
- ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
- DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
+ ByteArrayDataOutputView tmpKeySerializationView = new ByteArrayDataOutputView(128);
writeKeyWithGroupAndNamespace(
keyGroup,
@@ -150,16 +149,15 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
safeKeySerializer,
keyAndNamespace.f1,
safeNamespaceSerializer,
- tmpKeySerializationStream,
- tmpKeySerializationDateDataOutputView);
+ tmpKeySerializationView);
- return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
+ return backend.db.get(columnFamily, tmpKeySerializationView.toByteArray());
}
byte[] getKeyBytes() {
try {
writeCurrentKeyWithGroupAndNamespace();
- return keySerializationStream.toByteArray();
+ return dataOutputView.toByteArray();
} catch (IOException e) {
throw new FlinkRuntimeException("Error while serializing key", e);
}
@@ -167,9 +165,9 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
byte[] getValueBytes(V value) {
try {
- keySerializationStream.reset();
- valueSerializer.serialize(value, new DataOutputViewStreamWrapper(keySerializationStream));
- return keySerializationStream.toByteArray();
+ dataOutputView.reset();
+ valueSerializer.serialize(value, dataOutputView);
+ return dataOutputView.toByteArray();
} catch (IOException e) {
throw new FlinkRuntimeException("Error while serializing value", e);
}
@@ -180,14 +178,12 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
backend.getCurrentKeyGroupIndex(),
backend.getCurrentKey(),
currentNamespace,
- keySerializationStream,
- keySerializationDataOutputView);
+ dataOutputView);
}
protected void writeKeyWithGroupAndNamespace(
int keyGroup, K key, N namespace,
- ByteArrayOutputStreamWithPos keySerializationStream,
- DataOutputView keySerializationDataOutputView) throws IOException {
+ ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
writeKeyWithGroupAndNamespace(
keyGroup,
@@ -195,7 +191,6 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
backend.getKeySerializer(),
namespace,
namespaceSerializer,
- keySerializationStream,
keySerializationDataOutputView);
}
@@ -205,17 +200,16 @@ public abstract class AbstractRocksDBState<K, N, V, S extends State> implements
final TypeSerializer<K> keySerializer,
final N namespace,
final TypeSerializer<N> namespaceSerializer,
- final ByteArrayOutputStreamWithPos keySerializationStream,
- final DataOutputView keySerializationDataOutputView) throws IOException {
+ final ByteArrayDataOutputView keySerializationDataOutputView) throws IOException {
Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
Preconditions.checkNotNull(keySerializer);
Preconditions.checkNotNull(namespaceSerializer);
- keySerializationStream.reset();
+ keySerializationDataOutputView.reset();
RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
- RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
- RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+ RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
+ RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
}
protected V getDefaultValue() {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 209d18f..4f9ef2f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -121,17 +119,15 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(
- keyGroup, key, source,
- keySerializationStream, keySerializationDataOutputView);
+ writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
- final byte[] sourceKey = keySerializationStream.toByteArray();
+ final byte[] sourceKey = dataOutputView.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
if (valueBytes != null) {
- ACC value = valueSerializer.deserialize(
- new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+ dataInputView.setData(valueBytes);
+ ACC value = valueSerializer.deserialize(dataInputView);
if (current != null) {
current = aggFunction.merge(current, value);
@@ -146,27 +142,25 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(
- keyGroup, key, target,
- keySerializationStream, keySerializationDataOutputView);
+ writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
- final byte[] targetKey = keySerializationStream.toByteArray();
+ final byte[] targetKey = dataOutputView.toByteArray();
final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
// target also had a value, merge
- ACC value = valueSerializer.deserialize(
- new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+ dataInputView.setData(targetValueBytes);
+ ACC value = valueSerializer.deserialize(dataInputView);
current = aggFunction.merge(current, value);
}
// serialize the resulting value
- keySerializationStream.reset();
- valueSerializer.serialize(current, keySerializationDataOutputView);
+ dataOutputView.reset();
+ valueSerializer.serialize(current, dataOutputView);
// write the resulting value
- backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+ backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
}
}
catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 1bc49e9..7c9e3f8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -18,8 +18,8 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -41,13 +41,12 @@ public class RocksDBKeySerializationUtils {
public static <K> K readKey(
TypeSerializer<K> keySerializer,
- ByteArrayInputStreamWithPos inputStream,
- DataInputView inputView,
+ ByteArrayDataInputView inputView,
boolean ambiguousKeyPossible) throws IOException {
- int beforeRead = inputStream.getPosition();
+ int beforeRead = inputView.getPosition();
K key = keySerializer.deserialize(inputView);
if (ambiguousKeyPossible) {
- int length = inputStream.getPosition() - beforeRead;
+ int length = inputView.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return key;
@@ -55,13 +54,12 @@ public class RocksDBKeySerializationUtils {
public static <N> N readNamespace(
TypeSerializer<N> namespaceSerializer,
- ByteArrayInputStreamWithPos inputStream,
- DataInputView inputView,
+ ByteArrayDataInputView inputView,
boolean ambiguousKeyPossible) throws IOException {
- int beforeRead = inputStream.getPosition();
+ int beforeRead = inputView.getPosition();
N namespace = namespaceSerializer.deserialize(inputView);
if (ambiguousKeyPossible) {
- int length = inputStream.getPosition() - beforeRead;
+ int length = inputView.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return namespace;
@@ -70,17 +68,15 @@ public class RocksDBKeySerializationUtils {
public static <N> void writeNameSpace(
N namespace,
TypeSerializer<N> namespaceSerializer,
- ByteArrayOutputStreamWithPos keySerializationStream,
- DataOutputView keySerializationDataOutputView,
+ ByteArrayDataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {
- int beforeWrite = keySerializationStream.getPosition();
+ int beforeWrite = keySerializationDataOutputView.getPosition();
namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
if (ambiguousKeyPossible) {
//write length of namespace
- writeLengthFrom(beforeWrite, keySerializationStream,
- keySerializationDataOutputView);
+ writeLengthFrom(beforeWrite, keySerializationDataOutputView);
}
}
@@ -100,17 +96,15 @@ public class RocksDBKeySerializationUtils {
public static <K> void writeKey(
K key,
TypeSerializer<K> keySerializer,
- ByteArrayOutputStreamWithPos keySerializationStream,
- DataOutputView keySerializationDataOutputView,
+ ByteArrayDataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {
//write key
- int beforeWrite = keySerializationStream.getPosition();
+ int beforeWrite = keySerializationDataOutputView.getPosition();
keySerializer.serialize(key, keySerializationDataOutputView);
if (ambiguousKeyPossible) {
//write size of key
- writeLengthFrom(beforeWrite, keySerializationStream,
- keySerializationDataOutputView);
+ writeLengthFrom(beforeWrite, keySerializationDataOutputView);
}
}
@@ -123,9 +117,8 @@ public class RocksDBKeySerializationUtils {
private static void writeLengthFrom(
int fromPosition,
- ByteArrayOutputStreamWithPos keySerializationStream,
- DataOutputView keySerializationDateDataOutputView) throws IOException {
- int length = keySerializationStream.getPosition() - fromPosition;
+ ByteArrayDataOutputView keySerializationDateDataOutputView) throws IOException {
+ int length = keySerializationDateDataOutputView.getPosition() - fromPosition;
writeVariableIntBytes(length, keySerializationDateDataOutputView);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 55cc4f9..c159976 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -45,7 +45,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
@@ -363,17 +362,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
- final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
+ final ByteArrayDataOutputView namespaceOutputView = new ByteArrayDataOutputView(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
- namespaceOutputStream,
- new DataOutputViewStreamWrapper(namespaceOutputStream),
+ namespaceOutputView,
ambiguousKeyPossible);
- nameSpaceBytes = namespaceOutputStream.toByteArray();
+ nameSpaceBytes = namespaceOutputView.toByteArray();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 176f48c..cdd7afb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -25,9 +25,8 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -116,32 +115,31 @@ class RocksDBListState<K, N, V>
public List<V> getInternal() {
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
+ byte[] key = dataOutputView.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
- return deserializeList(valueBytes, elementSerializer);
+ return deserializeList(valueBytes);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
}
}
- private static <V> List<V> deserializeList(
- byte[] valueBytes, TypeSerializer<V> elementSerializer) {
+ private List<V> deserializeList(
+ byte[] valueBytes) {
if (valueBytes == null) {
return null;
}
- DataInputViewStreamWrapper in = new ByteArrayDataInputView(valueBytes);
+ dataInputView.setData(valueBytes);
List<V> result = new ArrayList<>();
V next;
- while ((next = deserializeNextElement(in, elementSerializer)) != null) {
+ while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
result.add(next);
}
return result;
}
- private static <V> V deserializeNextElement(
- DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
+ private static <V> V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
try {
if (in.available() > 0) {
V element = elementSerializer.deserialize(in);
@@ -162,11 +160,10 @@ class RocksDBListState<K, N, V>
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
- keySerializationStream.reset();
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
- elementSerializer.serialize(value, out);
- backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+ byte[] key = dataOutputView.toByteArray();
+ dataOutputView.reset();
+ elementSerializer.serialize(value, dataOutputView);
+ backend.db.merge(columnFamily, writeOptions, key, dataOutputView.toByteArray());
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
@@ -184,19 +181,15 @@ class RocksDBListState<K, N, V>
try {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(
- keyGroup, key, target,
- keySerializationStream, keySerializationDataOutputView);
- final byte[] targetKey = keySerializationStream.toByteArray();
+ writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
+ final byte[] targetKey = dataOutputView.toByteArray();
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(
- keyGroup, key, source,
- keySerializationStream, keySerializationDataOutputView);
+ writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
- byte[] sourceKey = keySerializationStream.toByteArray();
+ byte[] sourceKey = dataOutputView.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
@@ -225,14 +218,9 @@ class RocksDBListState<K, N, V>
if (!values.isEmpty()) {
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
-
- byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
- if (premerge != null) {
- backend.db.put(columnFamily, writeOptions, key, premerge);
- } else {
- throw new IOException("Failed pre-merge values in update()");
- }
+ byte[] key = dataOutputView.toByteArray();
+ byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+ backend.db.put(columnFamily, writeOptions, key, premerge);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -246,14 +234,9 @@ class RocksDBListState<K, N, V>
if (!values.isEmpty()) {
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
-
- byte[] premerge = getPreMergedValue(values, elementSerializer, keySerializationStream);
- if (premerge != null) {
- backend.db.merge(columnFamily, writeOptions, key, premerge);
- } else {
- throw new IOException("Failed pre-merge values in addAll()");
- }
+ byte[] key = dataOutputView.toByteArray();
+ byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
+ backend.db.merge(columnFamily, writeOptions, key, premerge);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -263,8 +246,7 @@ class RocksDBListState<K, N, V>
private static <V> byte[] getPreMergedValue(
List<V> values,
TypeSerializer<V> elementSerializer,
- ByteArrayOutputStreamWithPos keySerializationStream) throws IOException {
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+ ByteArrayDataOutputView keySerializationStream) throws IOException {
keySerializationStream.reset();
boolean first = true;
@@ -275,7 +257,7 @@ class RocksDBListState<K, N, V>
} else {
keySerializationStream.write(DELIMITER);
}
- elementSerializer.serialize(value, out);
+ elementSerializer.serialize(value, keySerializationStream);
}
return keySerializationStream.toByteArray();
@@ -298,7 +280,7 @@ class RocksDBListState<K, N, V>
static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
private final StateSnapshotTransformer<T> elementTransformer;
private final TypeSerializer<T> elementSerializer;
- private final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(128);
+ private final ByteArrayDataOutputView out = new ByteArrayDataOutputView(128);
private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b08eade..ad6b7c2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,10 +26,6 @@ import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
@@ -263,8 +259,7 @@ class RocksDBMapState<K, N, UK, UV>
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
- ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
- DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
+ ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(128);
writeKeyWithGroupAndNamespace(
keyGroup,
@@ -272,10 +267,9 @@ class RocksDBMapState<K, N, UK, UV>
safeKeySerializer,
keyAndNamespace.f1,
safeNamespaceSerializer,
- outputStream,
outputView);
- final byte[] keyPrefixBytes = outputStream.toByteArray();
+ final byte[] keyPrefixBytes = outputView.toByteArray();
final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
@@ -309,14 +303,14 @@ class RocksDBMapState<K, N, UK, UV>
private byte[] serializeCurrentKeyAndNamespace() throws IOException {
writeCurrentKeyWithGroupAndNamespace();
- return keySerializationStream.toByteArray();
+ return dataOutputView.toByteArray();
}
private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
serializeCurrentKeyAndNamespace();
- userKeySerializer.serialize(userKey, keySerializationDataOutputView);
+ userKeySerializer.serialize(userKey, dataOutputView);
- return keySerializationStream.toByteArray();
+ return dataOutputView.toByteArray();
}
private byte[] serializeUserValue(UV userValue) throws IOException {
@@ -328,34 +322,29 @@ class RocksDBMapState<K, N, UK, UV>
}
private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
- keySerializationStream.reset();
+ dataOutputView.reset();
if (userValue == null) {
- keySerializationDataOutputView.writeBoolean(true);
+ dataOutputView.writeBoolean(true);
} else {
- keySerializationDataOutputView.writeBoolean(false);
- valueSerializer.serialize(userValue, keySerializationDataOutputView);
+ dataOutputView.writeBoolean(false);
+ valueSerializer.serialize(userValue, dataOutputView);
}
- return keySerializationStream.toByteArray();
+ return dataOutputView.toByteArray();
}
private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
- ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
- DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
-
- in.skipBytes(userKeyOffset);
-
- return keySerializer.deserialize(in);
+ dataInputView.setData(rawKeyBytes, userKeyOffset, rawKeyBytes.length - userKeyOffset);
+ return keySerializer.deserialize(dataInputView);
}
private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
- ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
- DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+ dataInputView.setData(rawValueBytes);
- boolean isNull = in.readBoolean();
+ boolean isNull = dataInputView.readBoolean();
- return isNull ? null : valueSerializer.deserialize(in);
+ return isNull ? null : valueSerializer.deserialize(dataInputView);
}
private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 490960e..d1fe3bd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -25,15 +25,12 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;
-import java.io.IOException;
import java.util.Collection;
/**
@@ -87,7 +84,7 @@ class RocksDBReducingState<K, N, V>
}
@Override
- public V get() throws IOException {
+ public V get() {
return getInternal();
}
@@ -100,7 +97,7 @@ class RocksDBReducingState<K, N, V>
}
@Override
- public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ public void mergeNamespaces(N target, Collection<N> sources) {
if (sources == null || sources.isEmpty()) {
return;
}
@@ -116,17 +113,15 @@ class RocksDBReducingState<K, N, V>
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(
- keyGroup, key, source,
- keySerializationStream, keySerializationDataOutputView);
+ writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
- final byte[] sourceKey = keySerializationStream.toByteArray();
+ final byte[] sourceKey = dataOutputView.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
if (valueBytes != null) {
- V value = valueSerializer.deserialize(
- new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+ dataInputView.setData(valueBytes);
+ V value = valueSerializer.deserialize(dataInputView);
if (current != null) {
current = reduceFunction.reduce(current, value);
@@ -141,27 +136,25 @@ class RocksDBReducingState<K, N, V>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(
- keyGroup, key, target,
- keySerializationStream, keySerializationDataOutputView);
+ writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
- final byte[] targetKey = keySerializationStream.toByteArray();
+ final byte[] targetKey = dataOutputView.toByteArray();
final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
+ dataInputView.setData(targetValueBytes);
// target also had a value, merge
- V value = valueSerializer.deserialize(
- new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+ V value = valueSerializer.deserialize(dataInputView);
current = reduceFunction.reduce(current, value);
}
// serialize the resulting value
- keySerializationStream.reset();
- valueSerializer.serialize(current, keySerializationDataOutputView);
+ dataOutputView.reset();
+ valueSerializer.serialize(current, dataOutputView);
// write the resulting value
- backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+ backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.toByteArray());
}
}
catch (Exception e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 5ae894e..e9399e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -32,7 +30,6 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
/**
@@ -84,12 +81,13 @@ class RocksDBValueState<K, N, V>
public V value() {
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
+ byte[] key = dataOutputView.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return getDefaultValue();
}
- return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+ dataInputView.setData(valueBytes);
+ return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
}
@@ -101,13 +99,13 @@ class RocksDBValueState<K, N, V>
clear();
return;
}
- DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+
try {
writeCurrentKeyWithGroupAndNamespace();
- byte[] key = keySerializationStream.toByteArray();
- keySerializationStream.reset();
- valueSerializer.serialize(value, out);
- backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+ byte[] key = dataOutputView.toByteArray();
+ dataOutputView.reset();
+ valueSerializer.serialize(value, dataOutputView);
+ backend.db.put(columnFamily, writeOptions, key, dataOutputView.toByteArray());
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4121cf0..483b8fd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -18,9 +18,7 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.util.TestLogger;
@@ -114,35 +112,30 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
int currentGroupRangeStart = currentGroupRange.getStartKeyGroup();
int currentGroupRangeEnd = currentGroupRange.getEndKeyGroup();
+ ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(32);
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
- ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
- DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
for (int j = 0; j < 100; ++j) {
- outputStreamWithPos.reset();
+ outputView.reset();
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
RocksDBKeySerializationUtils.writeKey(
j,
IntSerializer.INSTANCE,
- outputStreamWithPos,
- new DataOutputViewStreamWrapper(outputStreamWithPos),
+ outputView,
false);
- rocksDB.put(columnFamilyHandle, outputStreamWithPos.toByteArray(), String.valueOf(j).getBytes());
+ rocksDB.put(columnFamilyHandle, outputView.toByteArray(), String.valueOf(j).getBytes());
}
}
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
- ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
- DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
for (int j = 0; j < 100; ++j) {
- outputStreamWithPos.reset();
+ outputView.reset();
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
RocksDBKeySerializationUtils.writeKey(
j,
IntSerializer.INSTANCE,
- outputStreamWithPos,
- new DataOutputViewStreamWrapper(outputStreamWithPos),
+ outputView,
false);
- byte[] value = rocksDB.get(columnFamilyHandle, outputStreamWithPos.toByteArray());
+ byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
Assert.assertEquals(String.valueOf(j), new String(value));
}
}
@@ -155,19 +148,15 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
keyGroupPrefixBytes);
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
- ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
- DataOutputView outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
for (int j = 0; j < 100; ++j) {
- outputStreamWithPos.reset();
+ outputView.reset();
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
RocksDBKeySerializationUtils.writeKey(
j,
IntSerializer.INSTANCE,
- outputStreamWithPos,
- new DataOutputViewStreamWrapper(outputStreamWithPos),
+ outputView,
false);
- byte[] value = rocksDB.get(
- columnFamilyHandle, outputStreamWithPos.toByteArray());
+ byte[] value = rocksDB.get(columnFamilyHandle, outputView.toByteArray());
if (targetGroupRange.contains(i)) {
Assert.assertEquals(String.valueOf(j), new String(value));
} else {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
index b1737ed..d92bef5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -61,39 +63,39 @@ public class RocksDBKeySerializationUtilsTest {
@Test
public void testKeySerializationAndDeserialization() throws Exception {
- ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
- DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+ final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+ final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
// test for key
for (int orgKey = 0; orgKey < 100; ++orgKey) {
- outputStream.reset();
- RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, false);
- ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
- int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+ outputView.reset();
+ RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, false);
+ inputView.setData(outputView.toByteArray());
+ int deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
Assert.assertEquals(orgKey, deserializedKey);
- RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputStream, outputView, true);
- inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
- deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+ RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
+ inputView.setData(outputView.toByteArray());
+ deserializedKey = RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
Assert.assertEquals(orgKey, deserializedKey);
}
}
@Test
public void testNamespaceSerializationAndDeserialization() throws Exception {
- ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
- DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
+ final ByteArrayDataOutputView outputView = new ByteArrayDataOutputView(8);
+ final ByteArrayDataInputView inputView = new ByteArrayDataInputView();
for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
- outputStream.reset();
- RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, false);
- ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
- int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), false);
+ outputView.reset();
+ RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, false);
+ inputView.setData(outputView.toByteArray());
+ int deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, false);
Assert.assertEquals(orgNamespace, deserializedNamepsace);
- RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputStream, outputView, true);
- inputStream = new ByteArrayInputStreamWithPos(outputStream.toByteArray());
- deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, new DataInputViewStreamWrapper(inputStream), true);
+ RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, IntSerializer.INSTANCE, outputView, true);
+ inputView.setData(outputView.toByteArray());
+ deserializedNamepsace = RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, true);
Assert.assertEquals(orgNamespace, deserializedNamepsace);
}
}