You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/08 10:36:22 UTC

[GitHub] tzulitai closed pull request #7428: [FLINK-11280] [state backends] Let RocksDBSerializedCompositeKeyBuilder accept key serializer lazily

tzulitai closed pull request #7428: [FLINK-11280] [state backends] Let RocksDBSerializedCompositeKeyBuilder accept key serializer lazily
URL: https://github.com/apache/flink/pull/7428
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index fb19a8a2816..92e1e4d25a0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -134,11 +134,11 @@ public void setCurrentNamespace(N namespace) {
 
 		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
 						new RocksDBSerializedCompositeKeyBuilder<>(
-							safeKeySerializer,
 							backend.getKeyGroupPrefixBytes(),
-							32
+							32,
+							RocksDBKeySerializationUtils.isSerializerTypeVariableSized(safeKeySerializer)
 						);
-		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup, safeKeySerializer);
 		byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 		return backend.db.get(columnFamily, key);
 	}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a47b306596..e033e1cfbab 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -297,7 +297,10 @@ public RocksDBKeyedStateBackend(
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
-		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
+		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
+			keyGroupPrefixBytes,
+			32,
+			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer));
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -339,8 +342,9 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
 
 		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
+		final TypeSerializer<K> keySerializer = getKeySerializer();
 		final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
-		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
+		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
 		try {
 			RocksDBKeySerializationUtils.writeNameSpace(
@@ -356,7 +360,7 @@ private static void checkAndCreateDirectory(File directory) throws IOException {
 		RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
 		iterator.seekToFirst();
 
-		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
+		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
 			ambiguousKeyPossible, nameSpaceBytes);
 
 		Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
@@ -380,7 +384,7 @@ private void registerKvStateInformation(String columnFamilyName, Tuple2<ColumnFa
 	@Override
 	public void setCurrentKey(K newKey) {
 		super.setCurrentKey(newKey);
-		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
+		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex(), getKeySerializer());
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 4c174d43ef8..7b1e4594832 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -261,11 +261,11 @@ public void clear() {
 
 		RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
 			new RocksDBSerializedCompositeKeyBuilder<>(
-				safeKeySerializer,
 				backend.getKeyGroupPrefixBytes(),
-				32);
+				32,
+				RocksDBKeySerializationUtils.isSerializerTypeVariableSized(safeKeySerializer));
 
-		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+		keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup, safeKeySerializer);
 
 		final byte[] keyPrefixBytes = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
index 8e83e292820..16f2a095721 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -39,10 +39,6 @@
 @Internal
 class RocksDBSerializedCompositeKeyBuilder<K> {
 
-	/** The serializer for the key. */
-	@Nonnull
-	private final TypeSerializer<K> keySerializer;
-
 	/** The output to write the key into. */
 	@Nonnull
 	private final DataOutputSerializer keyOutView;
@@ -59,25 +55,22 @@
 	private int afterKeyMark;
 
 	public RocksDBSerializedCompositeKeyBuilder(
-		@Nonnull TypeSerializer<K> keySerializer,
 		@Nonnegative int keyGroupPrefixBytes,
-		@Nonnegative int initialSize) {
+		@Nonnegative int initialSize,
+		boolean keySerializerTypeVariableSized) {
 		this(
-			keySerializer,
 			new DataOutputSerializer(initialSize),
 			keyGroupPrefixBytes,
-			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+			keySerializerTypeVariableSized,
 			0);
 	}
 
 	@VisibleForTesting
 	RocksDBSerializedCompositeKeyBuilder(
-		@Nonnull TypeSerializer<K> keySerializer,
 		@Nonnull DataOutputSerializer keyOutView,
 		@Nonnegative int keyGroupPrefixBytes,
 		boolean keySerializerTypeVariableSized,
 		@Nonnegative int afterKeyMark) {
-		this.keySerializer = keySerializer;
 		this.keyOutView = keyOutView;
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.keySerializerTypeVariableSized = keySerializerTypeVariableSized;
@@ -91,9 +84,9 @@ public RocksDBSerializedCompositeKeyBuilder(
 	 * @param key        the key.
 	 * @param keyGroupId the key-group id for the key.
 	 */
-	public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
+	public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId, @Nonnull TypeSerializer<K> keySerializer) {
 		try {
-			serializeKeyGroupAndKey(key, keyGroupId);
+			serializeKeyGroupAndKey(key, keyGroupId, keySerializer);
 		} catch (IOException shouldNeverHappen) {
 			throw new FlinkRuntimeException(shouldNeverHappen);
 		}
@@ -101,7 +94,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
 
 	/**
 	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
-	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+	 * {@link #setKeyAndKeyGroup(Object, int, TypeSerializer)} and the given namespace.
 	 *
 	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
 	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
@@ -122,7 +115,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
 
 	/**
 	 * Returns a serialized composite key, from the key and key-group provided in a previous call to
-	 * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given user-key.
+	 * {@link #setKeyAndKeyGroup(Object, int, TypeSerializer)} and the given namespace, folloed by the given user-key.
 	 *
 	 * @param namespace           the namespace to concatenate for the serialized composite key bytes.
 	 * @param namespaceSerializer the serializer to obtain the serialized form of the namespace.
@@ -145,7 +138,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
 		return result;
 	}
 
-	private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
+	private void serializeKeyGroupAndKey(K key, int keyGroupId, TypeSerializer<K> keySerializer) throws IOException {
 
 		// clear buffer and mark
 		resetFully();
@@ -198,9 +191,4 @@ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
 		return keySerializerTypeVariableSized &
 			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
 	}
-
-	@VisibleForTesting
-	boolean isKeySerializerTypeVariableSized() {
-		return keySerializerTypeVariableSized;
-	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
index 70a4c3cfef7..ba91564a544 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -88,7 +88,7 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 
 		final DataInputDeserializer deserializer = new DataInputDeserializer();
 		for (K testKey : testKeys) {
-			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, serializer, maxParallelism);
 			byte[] result = dataOutputSerializer.getCopyOfBuffer();
 			deserializer.setBuffer(result);
 			assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, serializer, deserializer, false);
@@ -112,7 +112,7 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
 
 		for (K testKey : testKeys) {
-			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, keySerializer, maxParallelism);
 			for (N testNamespace : testNamespaces) {
 				byte[] compositeBytes = keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
 				deserializer.setBuffer(compositeBytes);
@@ -148,7 +148,7 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 		final boolean ambiguousPossible = keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
 
 		for (K testKey : testKeys) {
-			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
+			int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, keySerializer, maxParallelism);
 			for (N testNamespace : testNamespaces) {
 				for (U testUserKey : testUserKeys) {
 					byte[] compositeBytes = keyBuilder.buildCompositeKeyNamesSpaceUserKey(
@@ -181,7 +181,6 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 		int prefixBytes) {
 		final boolean variableSize = RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
 		return new RocksDBSerializedCompositeKeyBuilder<>(
-			serializer,
 			dataOutputSerializer,
 			prefixBytes,
 			variableSize,
@@ -191,10 +190,11 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 	private <K> int setKeyAndReturnKeyGroup(
 		RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
 		K key,
+		TypeSerializer<K> keySerializer,
 		int maxParallelism) {
 
 		int keyGroup = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, maxParallelism);
-		compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+		compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup, keySerializer);
 		return keyGroup;
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services