You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/07 12:01:05 UTC
[flink] 03/03: [FLINK-9702] Improvement in (de)serialization of
keys and values for RocksDB state
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54ef382439b1687a645a377a5f6a095746423109
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Nov 27 13:51:29 2018 +0800
[FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
This closes #7288.
Co-authored-by: Stefan Richter <s....@data-artisans.com>
Co-authored-by: klion26 <qc...@gmail.com>
---
.../core/memory/ByteArrayOutputStreamWithPos.java | 3 +-
.../flink/core/memory/DataOutputSerializer.java | 7 +
.../streaming/state/AbstractRocksDBState.java | 140 ++++++------
.../streaming/state/RocksDBAggregatingState.java | 14 +-
.../state/RocksDBKeySerializationUtils.java | 12 +-
.../streaming/state/RocksDBKeyedStateBackend.java | 14 ++
.../contrib/streaming/state/RocksDBListState.java | 90 ++++----
.../contrib/streaming/state/RocksDBMapState.java | 83 +++----
.../streaming/state/RocksDBReducingState.java | 15 +-
.../RocksDBSerializedCompositeKeyBuilder.java | 206 +++++++++++++++++
.../contrib/streaming/state/RocksDBValueState.java | 14 +-
.../RocksDBSerializedCompositeKeyBuilderTest.java | 250 +++++++++++++++++++++
12 files changed, 643 insertions(+), 205 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5..2f4fdfe 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@@ -112,7 +111,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
}
@Override
- public void close() throws IOException {
+ public void close() {
}
public byte[] getBuf() {
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae0..b749c84 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.core.memory;
+import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -350,6 +352,11 @@ public class DataOutputSerializer implements DataOutputView {
this.position += numBytes;
}
+ public void setPosition(int position) {
+ Preconditions.checkArgument(position >= 0 && position <= this.position, "Position out of bounds.");
+ this.position = position;
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index b5ab996..fb19a8a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,6 +34,7 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.IOException;
+import java.util.List;
/**
* Base class for {@link State} implementations that store state in a RocksDB database.
@@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
protected final DataInputDeserializer dataInputView;
- private final boolean ambiguousKeyPossible;
+ private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
/**
* Creates a new RocksDB backed state.
@@ -99,8 +100,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
this.dataOutputView = new DataOutputSerializer(128);
this.dataInputView = new DataInputDeserializer();
- this.ambiguousKeyPossible =
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
+ this.sharedKeyNamespaceSerializer = backend.getSharedRocksKeyBuilder();
}
// ------------------------------------------------------------------------
@@ -108,17 +108,15 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
@Override
public void clear() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- backend.db.delete(columnFamily, writeOptions, key);
- } catch (IOException | RocksDBException e) {
+ backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace());
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
}
}
@Override
public void setCurrentNamespace(N namespace) {
- this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
+ this.currentNamespace = namespace;
}
@Override
@@ -128,30 +126,78 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<V> safeValueSerializer) throws Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace);
- Preconditions.checkNotNull(safeKeySerializer);
- Preconditions.checkNotNull(safeNamespaceSerializer);
- Preconditions.checkNotNull(safeValueSerializer);
-
//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
- // we cannot reuse the keySerializationStream member since this method
- // is called concurrently to the other ones and it may thus contain garbage
- DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128);
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new RocksDBSerializedCompositeKeyBuilder<>(
+ safeKeySerializer,
+ backend.getKeyGroupPrefixBytes(),
+ 32
+ );
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+ byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+ return backend.db.get(columnFamily, key);
+ }
+
+ <UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+ UK userKey,
+ TypeSerializer<UK> userKeySerializer) throws IOException {
+ return sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+ currentNamespace,
+ namespaceSerializer,
+ userKey,
+ userKeySerializer
+ );
+ }
+
+ private <T> byte[] serializeValueInternal(T value, TypeSerializer<T> serializer) throws IOException {
+ serializer.serialize(value, dataOutputView);
+ return dataOutputView.getCopyOfBuffer();
+ }
+
+ byte[] serializeCurrentKeyWithGroupAndNamespace() {
+ return sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, namespaceSerializer);
+ }
+
+ byte[] serializeValue(V value) throws IOException {
+ return serializeValue(value, valueSerializer);
+ }
+
+ <T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> serializer) throws IOException {
+ dataOutputView.clear();
+ dataOutputView.writeBoolean(value == null);
+ return serializeValueInternal(value, serializer);
+ }
+
+ <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
+ dataOutputView.clear();
+ return serializeValueInternal(value, serializer);
+ }
+
+ <T> byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ dataOutputView.clear();
+ boolean first = true;
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
- safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- tmpKeySerializationView);
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ dataOutputView.write(delimiter);
+ }
+ elementSerializer.serialize(value, dataOutputView);
+ }
- return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
+ return dataOutputView.getCopyOfBuffer();
}
public void migrateSerializedValue(
@@ -169,12 +215,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
}
byte[] getKeyBytes() {
- try {
- writeCurrentKeyWithGroupAndNamespace();
- return dataOutputView.getCopyOfBuffer();
- } catch (IOException e) {
- throw new FlinkRuntimeException("Error while serializing key", e);
- }
+ return serializeCurrentKeyWithGroupAndNamespace();
}
byte[] getValueBytes(V value) {
@@ -187,45 +228,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
}
}
- protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
- writeKeyWithGroupAndNamespace(
- backend.getCurrentKeyGroupIndex(),
- backend.getCurrentKey(),
- currentNamespace,
- dataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- int keyGroup, K key, N namespace,
- DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- key,
- backend.getKeySerializer(),
- namespace,
- namespaceSerializer,
- keySerializationDataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- final int keyGroup,
- final K key,
- final TypeSerializer<K> keySerializer,
- final N namespace,
- final TypeSerializer<N> namespaceSerializer,
- final DataOutputSerializer keySerializationDataOutputView) throws IOException {
-
- Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
- Preconditions.checkNotNull(keySerializer);
- Preconditions.checkNotNull(namespaceSerializer);
-
- keySerializationDataOutputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
- RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
- RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
- }
-
protected V getDefaultValue() {
if (defaultValue != null) {
return valueSerializer.copy(defaultValue);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 770c558..fbd2979 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -109,19 +109,14 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
ACC current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
- final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
@@ -141,10 +136,9 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
+ setCurrentNamespace(target);
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
- final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+ final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bf..02b4ffc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
/**
@@ -80,8 +82,12 @@ public class RocksDBKeySerializationUtils {
}
}
+ public static boolean isSerializerTypeVariableSized(@Nonnull TypeSerializer<?> serializer) {
+ return serializer.getLength() < 0;
+ }
+
public static boolean isAmbiguousKeyPossible(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) {
- return (keySerializer.getLength() < 0) && (namespaceSerializer.getLength() < 0);
+ return (isSerializerTypeVariableSized(keySerializer) && isSerializerTypeVariableSized(namespaceSerializer));
}
public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public class RocksDBKeySerializationUtils {
}
}
- private static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
+ public static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
@@ -122,7 +128,7 @@ public class RocksDBKeySerializationUtils {
writeVariableIntBytes(length, keySerializationDateDataOutputView);
}
- private static void writeVariableIntBytes(
+ public static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72445ca..6b00393 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,6 +242,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;
+ /** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
+ private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -294,6 +297,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.kvStateInformation = new LinkedHashMap<>();
this.writeOptions = new WriteOptions().setDisableWAL(true);
+ this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
@@ -373,6 +377,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ @Override
+ public void setCurrentKey(K newKey) {
+ super.setCurrentKey(newKey);
+ sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
+ }
+
/**
* Should only be called by one thread, and only after all accesses to the DB happened.
*/
@@ -456,6 +466,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return writeOptions;
}
+ RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+ return sharedRocksKeyBuilder;
+ }
+
/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index cedeed8..13f5559 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -115,11 +115,10 @@ class RocksDBListState<K, N, V>
@Override
public List<V> getInternal() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
+ byte[] key = serializeCurrentKeyWithGroupAndNamespace();
byte[] valueBytes = backend.db.get(columnFamily, key);
return deserializeList(valueBytes);
- } catch (IOException | RocksDBException e) {
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
}
}
@@ -160,11 +159,12 @@ class RocksDBListState<K, N, V>
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- elementSerializer.serialize(value, dataOutputView);
- backend.db.merge(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value, elementSerializer)
+ );
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
@@ -176,21 +176,17 @@ class RocksDBListState<K, N, V>
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
- final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
+ setCurrentNamespace(source);
+ final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
- byte[] sourceKey = dataOutputView.getCopyOfBuffer();
byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
@@ -218,10 +214,11 @@ class RocksDBListState<K, N, V>
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key, premerge);
+ backend.db.put(
+ columnFamily,
+ writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values, elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -234,10 +231,11 @@ class RocksDBListState<K, N, V>
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView);
- backend.db.merge(columnFamily, writeOptions, key, premerge);
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values, elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -273,26 +271,6 @@ class RocksDBListState<K, N, V>
}
}
- private static <V> byte[] getPreMergedValue(
- List<V> values,
- TypeSerializer<V> elementSerializer,
- DataOutputSerializer keySerializationStream) throws IOException {
-
- keySerializationStream.clear();
- boolean first = true;
- for (V value : values) {
- Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
- if (first) {
- first = false;
- } else {
- keySerializationStream.write(DELIMITER);
- }
- elementSerializer.serialize(value, keySerializationStream);
- }
-
- return keySerializationStream.getCopyOfBuffer();
- }
-
@SuppressWarnings("unchecked")
static <E, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ class RocksDBListState<K, N, V>
prevPosition = in.getPosition();
}
try {
- return result.isEmpty() ? null : getPreMergedValue(result, elementSerializer, out);
+ return result.isEmpty() ? null : serializeValueList(result, elementSerializer, DELIMITER);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to serialize transformed list", e);
}
}
+
+ byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ out.clear();
+ boolean first = true;
+
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ out.write(delimiter);
+ }
+ elementSerializer.serialize(value, out);
+ }
+
+ return out.getCopyOfBuffer();
+ }
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 7abbe3d..13cbded 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -119,7 +119,7 @@ class RocksDBMapState<K, N, UK, UV>
@Override
public UV get(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ class RocksDBMapState<K, N, UK, UV>
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
- byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
- byte[] rawValueBytes = serializeUserValue(userValue, userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+ byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
@@ -142,8 +142,8 @@ class RocksDBMapState<K, N, UK, UV>
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
- byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
- byte[] rawValueBytes = serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(), userKeySerializer);
+ byte[] rawValueBytes = serializeValueNullSensitive(entry.getValue(), userValueSerializer);
writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
}
}
@@ -151,21 +151,21 @@ class RocksDBMapState<K, N, UK, UV>
@Override
public void remove(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
}
@Override
public boolean contains(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes != null);
}
@Override
- public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+ public Iterable<Map.Entry<UK, UV>> entries() {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
// Return null to make the behavior consistent with other states.
@@ -177,10 +177,11 @@ class RocksDBMapState<K, N, UK, UV>
}
@Override
- public Iterable<UK> keys() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UK> keys() {
+ final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+ @Nullable
@Override
public UK next() {
RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ class RocksDBMapState<K, N, UK, UV>
}
@Override
- public Iterable<UV> values() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UV> values() {
+ final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -203,8 +204,8 @@ class RocksDBMapState<K, N, UK, UV>
}
@Override
- public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -220,7 +221,7 @@ class RocksDBMapState<K, N, UK, UV>
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
- final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
+ final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
iterator.seek(keyPrefixBytes);
while (iterator.isValid()) {
@@ -259,18 +260,15 @@ class RocksDBMapState<K, N, UK, UV>
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
- DataOutputSerializer outputView = new DataOutputSerializer(128);
- DataInputDeserializer inputView = new DataInputDeserializer();
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new RocksDBSerializedCompositeKeyBuilder<>(
safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- outputView);
+ backend.getKeyGroupPrefixBytes(),
+ 32);
+
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
- final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+ final byte[] keyPrefixBytes = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
@@ -282,7 +280,8 @@ class RocksDBMapState<K, N, UK, UV>
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer,
- inputView) {
+ dataInputView
+ ) {
@Override
public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ class RocksDBMapState<K, N, UK, UV>
// Serialization Methods
// ------------------------------------------------------------------------
- private byte[] serializeCurrentKeyAndNamespace() throws IOException {
- writeCurrentKeyWithGroupAndNamespace();
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
- serializeCurrentKeyAndNamespace();
- userKeySerializer.serialize(userKey, dataOutputView);
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private static <UV> byte[] serializeUserValue(
- UV userValue,
- TypeSerializer<UV> valueSerializer,
- DataOutputSerializer dataOutputView) throws IOException {
-
- dataOutputView.clear();
-
- if (userValue == null) {
- dataOutputView.writeBoolean(true);
- } else {
- dataOutputView.writeBoolean(false);
- valueSerializer.serialize(userValue, dataOutputView);
- }
-
- return dataOutputView.getCopyOfBuffer();
- }
-
private static <UK> UK deserializeUserKey(
DataInputDeserializer dataInputView,
int userKeyOffset,
@@ -474,7 +443,7 @@ class RocksDBMapState<K, N, UK, UV>
try {
userValue = value;
- rawValueBytes = serializeUserValue(value, valueSerializer, dataOutputView);
+ rawValueBytes = serializeValueNullSensitive(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
} catch (IOException | RocksDBException e) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 69736ae..a5eb46e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -102,20 +102,14 @@ class RocksDBReducingState<K, N, V>
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
V current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
-
- writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);
-
- final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
@@ -136,9 +130,8 @@ class RocksDBReducingState<K, N, V>
// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);
-
- final byte[] targetKey = dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 0000000..8e83e29
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder<K> {
+
+ /** The serializer for the key. */
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ /** The output to write the key into. */
+ @Nonnull
+ private final DataOutputSerializer keyOutView;
+
+ /** The number of Key-group-prefix bytes for the key. */
+ @Nonnegative
+ private final int keyGroupPrefixBytes;
+
+ /** This flag indicates whether the key type has a variable byte size in serialization. */
+ private final boolean keySerializerTypeVariableSized;
+
+ /** Mark for the position after the serialized key. */
+ @Nonnegative
+ private int afterKeyMark;
+
+ public RocksDBSerializedCompositeKeyBuilder(
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnegative int initialSize) {
+ this(
+ keySerializer,
+ new DataOutputSerializer(initialSize),
+ keyGroupPrefixBytes,
+ RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+ 0);
+ }
+
+ @VisibleForTesting
+ RocksDBSerializedCompositeKeyBuilder(
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull DataOutputSerializer keyOutView,
+ @Nonnegative int keyGroupPrefixBytes,
+ boolean keySerializerTypeVariableSized,
+ @Nonnegative int afterKeyMark) {
+ this.keySerializer = keySerializer;
+ this.keyOutView = keyOutView;
+ this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+ this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
+ this.afterKeyMark = afterKeyMark;
+ }
+
+ /**
+ * Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create
+ * composite keys with provided namespaces.
+ *
+ * @param key the key.
+ * @param keyGroupId the key-group id for the key.
+ */
+ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
+ try {
+ serializeKeyGroupAndKey(key, keyGroupId);
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ /**
+ * Returns a serialized composite key, from the key and key-group provided in a previous call to
+ * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+ *
+ * @param namespace the namespace to concatenate for the serialized composite key bytes.
+ * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+ * @param <N> the type of the namespace.
+ * @return the bytes for the serialized composite key of key-group, key, namespace.
+ */
+ @Nonnull
+ public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
+ try {
+ serializeNamespace(namespace, namespaceSerializer);
+ final byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ /**
+ * Returns a serialized composite key, from the key and key-group provided in a previous call to
+ * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given user-key.
+ *
+ * @param namespace the namespace to concatenate for the serialized composite key bytes.
+ * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
+ * @param userKey the user-key to concatenate for the serialized composite key, after the namespace.
+ * @param userKeySerializer the serializer to obtain the serialized form of the user-key.
+ * @param <N> the type of the namespace.
+ * @param <UK> the type of the user-key.
+ * @return the bytes for the serialized composite key of key-group, key, namespace.
+ */
+ @Nonnull
+ public <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull UK userKey,
+ @Nonnull TypeSerializer<UK> userKeySerializer) throws IOException {
+ serializeNamespace(namespace, namespaceSerializer);
+ userKeySerializer.serialize(userKey, keyOutView);
+ byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ }
+
+ private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
+
+ // clear buffer and mark
+ resetFully();
+
+ // write key-group
+ RocksDBKeySerializationUtils.writeKeyGroup(
+ keyGroupId,
+ keyGroupPrefixBytes,
+ keyOutView);
+ // write key
+ keySerializer.serialize(key, keyOutView);
+ afterKeyMark = keyOutView.length();
+ }
+
+ private <N> void serializeNamespace(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer) throws IOException {
+
+ // this should only be called when there is already a key written so that we build the composite.
+ assert isKeyWritten();
+
+ final boolean ambiguousCompositeKeyPossible = isAmbiguousCompositeKeyPossible(namespaceSerializer);
+ if (ambiguousCompositeKeyPossible) {
+ RocksDBKeySerializationUtils.writeVariableIntBytes(
+ afterKeyMark - keyGroupPrefixBytes,
+ keyOutView);
+ }
+ RocksDBKeySerializationUtils.writeNameSpace(
+ namespace,
+ namespaceSerializer,
+ keyOutView,
+ ambiguousCompositeKeyPossible);
+ }
+
+ private void resetFully() {
+ afterKeyMark = 0;
+ keyOutView.clear();
+ }
+
+ private void resetToKey() {
+ keyOutView.setPosition(afterKeyMark);
+ }
+
+ private boolean isKeyWritten() {
+ return afterKeyMark > 0;
+ }
+
+ @VisibleForTesting
+ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
+ return keySerializerTypeVariableSized &
+ RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+ }
+
+ @VisibleForTesting
+ boolean isKeySerializerTypeVariableSized() {
+ return keySerializerTypeVariableSized;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 97b83df..a8f5163 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -80,9 +80,9 @@ class RocksDBValueState<K, N, V>
@Override
public V value() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] valueBytes = backend.db.get(columnFamily, key);
+ byte[] valueBytes = backend.db.get(columnFamily,
+ serializeCurrentKeyWithGroupAndNamespace());
+
if (valueBytes == null) {
return getDefaultValue();
}
@@ -101,11 +101,9 @@ class RocksDBValueState<K, N, V>
}
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- valueSerializer.serialize(value, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key, dataOutputView.getCopyOfBuffer());
+ backend.db.put(columnFamily, writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value));
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
new file mode 100644
index 0000000..70a4c3c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+ private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+
+ private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+ private static final Collection<Integer> TEST_INTS = Arrays.asList(42, 4711);
+ private static final Collection<String> TEST_STRINGS = Arrays.asList("test123", "abc");
+
+ @Before
+ public void before() {
+ dataOutputSerializer.clear();
+ }
+
+ @Test
+ public void testSetKey() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+ testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, parallelism);
+ testSetKeyInternal(StringSerializer.INSTANCE, TEST_STRINGS, parallelism);
+ }
+ }
+
+ @Test
+ public void testSetKeyNamespace() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+ testSetKeyNamespaceInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+ testSetKeyNamespaceInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+ testSetKeyNamespaceInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+ testSetKeyNamespaceInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+ }
+ }
+
+ @Test
+ public void testSetKeyNamespaceUserKey() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+ testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_INTS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_INTS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_INTS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_INTS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_STRINGS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_STRINGS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_STRINGS, parallelism);
+ testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_STRINGS, parallelism);
+ }
+ }
+
+ private <K> void testSetKeyInternal(TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes);
+
+ final DataInputDeserializer deserializer = new DataInputDeserializer();
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+ byte[] result = dataOutputSerializer.getCopyOfBuffer();
+ deserializer.setBuffer(result);
+ assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, serializer, deserializer, false);
+ Assert.assertEquals(0, deserializer.available());
+ }
+ }
+
+ private <K, N> void testSetKeyNamespaceInternal(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Collection<K> testKeys,
+ Collection<N> testNamespaces,
+ int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+ final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+ final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+ for (N testNamespace : testNamespaces) {
+ byte[] compositeBytes = keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+ deserializer.setBuffer(compositeBytes);
+ assertKeyGroupKeyNamespaceBytes(
+ testKey,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ testNamespace,
+ namespaceSerializer,
+ deserializer,
+ ambiguousPossible);
+ Assert.assertEquals(0, deserializer.available());
+ }
+ }
+ }
+
+ private <K, N, U> void testSetKeyNamespaceUserKeyInternal(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<U> userKeySerializer,
+ Collection<K> testKeys,
+ Collection<N> testNamespaces,
+ Collection<U> testUserKeys,
+ int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+ final DataInputDeserializer deserializer = new DataInputDeserializer();
+
+ final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+ for (N testNamespace : testNamespaces) {
+ for (U testUserKey : testUserKeys) {
+ byte[] compositeBytes = keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+ testNamespace,
+ namespaceSerializer,
+ testUserKey,
+ userKeySerializer);
+
+ deserializer.setBuffer(compositeBytes);
+ assertKeyGroupKeyNamespaceUserKeyBytes(
+ testKey,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ testNamespace,
+ namespaceSerializer,
+ testUserKey,
+ userKeySerializer,
+ deserializer,
+ ambiguousPossible);
+
+ Assert.assertEquals(0, deserializer.available());
+ }
+ }
+ }
+ }
+
+ private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
+ TypeSerializer<K> serializer,
+ int prefixBytes) {
+ final boolean variableSize = RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+ return new RocksDBSerializedCompositeKeyBuilder<>(
+ serializer,
+ dataOutputSerializer,
+ prefixBytes,
+ variableSize,
+ 0);
+ }
+
+ private <K> int setKeyAndReturnKeyGroup(
+ RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
+ K key,
+ int maxParallelism) {
+
+ int keyGroup = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, maxParallelism);
+ compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+ return keyGroup;
+ }
+
+ private <K> void assertKeyKeyGroupBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> typeSerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+
+ Assert.assertEquals(keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+ Assert.assertEquals(key, RocksDBKeySerializationUtils.readKey(typeSerializer, deserializer, ambiguousCompositeKeyPossible));
+ }
+
+ private <K, N> void assertKeyGroupKeyNamespaceBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+ assertKeyKeyGroupBytes(key, keyGroup, prefixBytes, keySerializer, deserializer, ambiguousCompositeKeyPossible);
+ N readNamespace =
+ RocksDBKeySerializationUtils.readNamespace(namespaceSerializer, deserializer, ambiguousCompositeKeyPossible);
+ Assert.assertEquals(namespace, readNamespace);
+ }
+
+ private <K, N, U> void assertKeyGroupKeyNamespaceUserKeyBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ U userKey,
+ TypeSerializer<U> userKeySerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+ assertKeyGroupKeyNamespaceBytes(
+ key,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ namespace,
+ namespaceSerializer,
+ deserializer,
+ ambiguousCompositeKeyPossible);
+ Assert.assertEquals(userKey, userKeySerializer.deserialize(deserializer));
+ }
+}