You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:18 UTC
[flink] 02/09: [refactor] Move RocksDBCompositeKeyBuilder to a
common package
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a3436cb67866fddec45c9ac1bd760c24732ca32b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jan 28 14:41:53 2021 +0100
[refactor] Move RocksDBCompositeKeyBuilder to a common package
---
.../state/CompositeKeySerializationUtils.java | 7 +-
.../state/SerializedCompositeKeyBuilder.java | 79 ++++++++++---
.../state/CompositeKeySerializationUtilsTest.java | 32 +++---
.../state/SerializedCompositeKeyBuilderTest.java | 128 +++++++++++++++------
.../streaming/state/AbstractRocksDBState.java | 7 +-
.../state/RocksDBCachingPriorityQueueSet.java | 3 +-
.../state/RocksDBIncrementalCheckpointUtils.java | 9 +-
.../streaming/state/RocksDBKeyedStateBackend.java | 14 ++-
.../state/RocksDBKeyedStateBackendBuilder.java | 8 +-
.../contrib/streaming/state/RocksDBMapState.java | 5 +-
.../iterator/AbstractRocksStateKeysIterator.java | 4 +-
.../RocksStateKeysAndNamespaceIterator.java | 4 +-
.../RocksDBIncrementalRestoreOperation.java | 6 +-
...rtitionedPriorityQueueWithRocksDBStoreTest.java | 4 +-
.../RocksDBIncrementalCheckpointUtilsTest.java | 16 ++-
...sDBRocksStateKeysAndNamespacesIteratorTest.java | 5 +-
.../state/RocksDBRocksStateKeysIteratorTest.java | 5 +-
17 files changed, 227 insertions(+), 109 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
similarity index 96%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
index 95c7e0b..8088c19 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -28,9 +28,10 @@ import javax.annotation.Nonnull;
import java.io.IOException;
/** Utils for RocksDB state serialization and deserialization. */
-public class RocksDBKeySerializationUtils {
+public class CompositeKeySerializationUtils {
- static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
+ public static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView)
+ throws IOException {
int keyGroup = 0;
for (int i = 0; i < keyGroupPrefixBytes; ++i) {
keyGroup <<= 8;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
similarity index 72%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
index 78397ac..382bf04 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -38,7 +38,7 @@ import java.io.IOException;
*/
@NotThreadSafe
@Internal
-class RocksDBSerializedCompositeKeyBuilder<K> {
+public final class SerializedCompositeKeyBuilder<K> {
/** The serializer for the key. */
@Nonnull private final TypeSerializer<K> keySerializer;
@@ -55,7 +55,9 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
/** Mark for the position after the serialized key. */
@Nonnegative private int afterKeyMark;
- public RocksDBSerializedCompositeKeyBuilder(
+ @Nonnegative private int afterNamespaceMark;
+
+ public SerializedCompositeKeyBuilder(
@Nonnull TypeSerializer<K> keySerializer,
@Nonnegative int keyGroupPrefixBytes,
@Nonnegative int initialSize) {
@@ -63,12 +65,12 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
keySerializer,
new DataOutputSerializer(initialSize),
keyGroupPrefixBytes,
- RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+ CompositeKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
0);
}
@VisibleForTesting
- RocksDBSerializedCompositeKeyBuilder(
+ SerializedCompositeKeyBuilder(
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull DataOutputSerializer keyOutView,
@Nonnegative int keyGroupPrefixBytes,
@@ -96,6 +98,15 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
}
}
+ public <N> void setNamespace(
+ @Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
+ try {
+ serializeNamespace(namespace, namespaceSerializer);
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
/**
* Returns a serialized composite key, from the key and key-group provided in a previous call to
* {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
@@ -110,9 +121,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
try {
serializeNamespace(namespace, namespaceSerializer);
- final byte[] result = keyOutView.getCopyOfBuffer();
- resetToKey();
- return result;
+ return keyOutView.getCopyOfBuffer();
} catch (IOException shouldNeverHappen) {
throw new FlinkRuntimeException(shouldNeverHappen);
}
@@ -120,7 +129,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
/**
* Returns a serialized composite key, from the key and key-group provided in a previous call to
- * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given
+ * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, followed by the given
* user-key.
*
* @param namespace the namespace to concatenate for the serialized composite key bytes.
@@ -141,9 +150,35 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
throws IOException {
serializeNamespace(namespace, namespaceSerializer);
userKeySerializer.serialize(userKey, keyOutView);
- byte[] result = keyOutView.getCopyOfBuffer();
- resetToKey();
- return result;
+ return keyOutView.getCopyOfBuffer();
+ }
+
+ /**
+ * Returns a serialized composite key, from the key and key-group provided in a previous call to
+ * {@link #setKeyAndKeyGroup(Object, int)} and the namespace provided in {@link
+ * #setNamespace(Object, TypeSerializer)}, followed by the given user-key.
+ *
+ * @param userKey the user-key to concatenate for the serialized composite key, after the
+ * namespace.
+ * @param userKeySerializer the serializer to obtain the serialized form of the user-key.
+ * @param <UK> the type of the user-key.
+ * @return the bytes for the serialized composite key of key-group, key, namespace.
+ */
+ @Nonnull
+ public <UK> byte[] buildCompositeKeyUserKey(
+ @Nonnull UK userKey, @Nonnull TypeSerializer<UK> userKeySerializer) throws IOException {
+ // this should only be called when there is already a namespace written.
+ assert isNamespaceWritten();
+ resetToNamespace();
+
+ userKeySerializer.serialize(userKey, keyOutView);
+ return keyOutView.getCopyOfBuffer();
+ }
+
+ /** Returns a serialized composite key, from whatever was set so far. */
+ @Nonnull
+ public byte[] build() throws IOException {
+ return keyOutView.getCopyOfBuffer();
}
private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
@@ -152,7 +187,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
resetFully();
// write key-group
- RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView);
+ CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView);
// write key
keySerializer.serialize(key, keyOutView);
afterKeyMark = keyOutView.length();
@@ -165,33 +200,45 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
// this should only be called when there is already a key written so that we build the
// composite.
assert isKeyWritten();
+ resetToKey();
final boolean ambiguousCompositeKeyPossible =
isAmbiguousCompositeKeyPossible(namespaceSerializer);
if (ambiguousCompositeKeyPossible) {
- RocksDBKeySerializationUtils.writeVariableIntBytes(
+ CompositeKeySerializationUtils.writeVariableIntBytes(
afterKeyMark - keyGroupPrefixBytes, keyOutView);
}
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
namespace, namespaceSerializer, keyOutView, ambiguousCompositeKeyPossible);
+ afterNamespaceMark = keyOutView.length();
}
private void resetFully() {
afterKeyMark = 0;
+ afterNamespaceMark = 0;
keyOutView.clear();
}
private void resetToKey() {
+ afterNamespaceMark = 0;
keyOutView.setPosition(afterKeyMark);
}
+ private void resetToNamespace() {
+ keyOutView.setPosition(afterNamespaceMark);
+ }
+
private boolean isKeyWritten() {
return afterKeyMark > 0;
}
+ private boolean isNamespaceWritten() {
+ return afterNamespaceMark > 0;
+ }
+
@VisibleForTesting
boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
return keySerializerTypeVariableSized
- & RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+ & CompositeKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
similarity index 80%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
index 3bbb20b..65ba8ea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -30,17 +30,17 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Assert;
import org.junit.Test;
-/** Tests for guarding {@link RocksDBKeySerializationUtils}. */
-public class RocksDBKeySerializationUtilsTest {
+/** Tests for guarding {@link CompositeKeySerializationUtils}. */
+public class CompositeKeySerializationUtilsTest {
@Test
public void testIsAmbiguousKeyPossible() {
Assert.assertFalse(
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
IntSerializer.INSTANCE, StringSerializer.INSTANCE));
Assert.assertTrue(
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
StringSerializer.INSTANCE, StringSerializer.INSTANCE));
}
@@ -52,10 +52,10 @@ public class RocksDBKeySerializationUtilsTest {
for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
outputStream.reset();
- RocksDBKeySerializationUtils.writeKeyGroup(
+ CompositeKeySerializationUtils.writeKeyGroup(
orgKeyGroup, keyGroupPrefixBytes, outputView);
int deserializedKeyGroup =
- RocksDBKeySerializationUtils.readKeyGroup(
+ CompositeKeySerializationUtils.readKeyGroup(
keyGroupPrefixBytes,
new DataInputViewStreamWrapper(
new ByteArrayInputStreamWithPos(
@@ -73,17 +73,19 @@ public class RocksDBKeySerializationUtilsTest {
// test for key
for (int orgKey = 0; orgKey < 100; ++orgKey) {
outputView.clear();
- RocksDBKeySerializationUtils.writeKey(
+ CompositeKeySerializationUtils.writeKey(
orgKey, IntSerializer.INSTANCE, outputView, false);
inputView.setBuffer(outputView.getCopyOfBuffer());
int deserializedKey =
- RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
+ CompositeKeySerializationUtils.readKey(
+ IntSerializer.INSTANCE, inputView, false);
Assert.assertEquals(orgKey, deserializedKey);
- RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
+ CompositeKeySerializationUtils.writeKey(
+ orgKey, IntSerializer.INSTANCE, outputView, true);
inputView.setBuffer(outputView.getCopyOfBuffer());
deserializedKey =
- RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
+ CompositeKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
Assert.assertEquals(orgKey, deserializedKey);
}
}
@@ -95,19 +97,19 @@ public class RocksDBKeySerializationUtilsTest {
for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
outputView.clear();
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
orgNamespace, IntSerializer.INSTANCE, outputView, false);
inputView.setBuffer(outputView.getCopyOfBuffer());
int deserializedNamepsace =
- RocksDBKeySerializationUtils.readNamespace(
+ CompositeKeySerializationUtils.readNamespace(
IntSerializer.INSTANCE, inputView, false);
Assert.assertEquals(orgNamespace, deserializedNamepsace);
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
orgNamespace, IntSerializer.INSTANCE, outputView, true);
inputView.setBuffer(outputView.getCopyOfBuffer());
deserializedNamepsace =
- RocksDBKeySerializationUtils.readNamespace(
+ CompositeKeySerializationUtils.readNamespace(
IntSerializer.INSTANCE, inputView, true);
Assert.assertEquals(orgNamespace, deserializedNamepsace);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
similarity index 73%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
index 9e4a208..8c7ac96 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.junit.Assert;
import org.junit.Before;
@@ -33,8 +32,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-/** Test for @{@link RocksDBSerializedCompositeKeyBuilder}. */
-public class RocksDBSerializedCompositeKeyBuilderTest {
+/** Test for @{@link SerializedCompositeKeyBuilder}. */
+public class SerializedCompositeKeyBuilderTest {
private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
@@ -57,36 +56,60 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
@Test
public void testSetKeyNamespace() throws IOException {
+ testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.BUILD);
+ }
+
+ @Test
+ public void testSetKeyNamespaceWithSet() throws IOException {
+ testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.SET_AND_BUILD);
+ }
+
+ private void testSetKeyNamespaceInternal(BuildKeyAndNamespaceType buildKeyAndNamespaceType)
+ throws IOException {
for (int parallelism : TEST_PARALLELISMS) {
testSetKeyNamespaceInternal(
IntSerializer.INSTANCE,
IntSerializer.INSTANCE,
TEST_INTS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceInternal(
IntSerializer.INSTANCE,
StringSerializer.INSTANCE,
TEST_INTS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceInternal(
StringSerializer.INSTANCE,
IntSerializer.INSTANCE,
TEST_STRINGS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceInternal(
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
TEST_STRINGS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
}
}
@Test
public void testSetKeyNamespaceUserKey() throws IOException {
+ testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.BUILD);
+ }
+
+ @Test
+ public void testSetKeyNamespaceUserKeyWithSet() throws IOException {
+ testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.SET_AND_BUILD);
+ }
+
+ private void testSetKeyNamespaceUserKeyInternal(
+ BuildKeyAndNamespaceType buildKeyAndNamespaceType) throws IOException {
for (int parallelism : TEST_PARALLELISMS) {
testSetKeyNamespaceUserKeyInternal(
IntSerializer.INSTANCE,
@@ -95,7 +118,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_INTS,
TEST_INTS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
IntSerializer.INSTANCE,
StringSerializer.INSTANCE,
@@ -103,7 +127,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_INTS,
TEST_STRINGS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
StringSerializer.INSTANCE,
IntSerializer.INSTANCE,
@@ -111,7 +136,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_STRINGS,
TEST_INTS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
@@ -119,7 +145,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_STRINGS,
TEST_STRINGS,
TEST_INTS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
IntSerializer.INSTANCE,
IntSerializer.INSTANCE,
@@ -127,7 +154,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_INTS,
TEST_INTS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
IntSerializer.INSTANCE,
StringSerializer.INSTANCE,
@@ -135,7 +163,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_INTS,
TEST_STRINGS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
StringSerializer.INSTANCE,
IntSerializer.INSTANCE,
@@ -143,7 +172,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_STRINGS,
TEST_INTS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
testSetKeyNamespaceUserKeyInternal(
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
@@ -151,7 +181,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TEST_STRINGS,
TEST_STRINGS,
TEST_STRINGS,
- parallelism);
+ parallelism,
+ buildKeyAndNamespaceType);
}
}
@@ -159,7 +190,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism)
throws IOException {
final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
- RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ SerializedCompositeKeyBuilder<K> keyBuilder =
createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes);
final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -172,16 +203,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
}
}
+ enum BuildKeyAndNamespaceType {
+ BUILD,
+ SET_AND_BUILD
+ }
+
private <K, N> void testSetKeyNamespaceInternal(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Collection<K> testKeys,
Collection<N> testNamespaces,
- int maxParallelism)
+ int maxParallelism,
+ BuildKeyAndNamespaceType buildKeyAndNamespaceType)
throws IOException {
final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
- RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ SerializedCompositeKeyBuilder<K> keyBuilder =
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -192,8 +229,15 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
for (K testKey : testKeys) {
int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
for (N testNamespace : testNamespaces) {
- byte[] compositeBytes =
- keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+ final byte[] compositeBytes;
+ if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) {
+ compositeBytes =
+ keyBuilder.buildCompositeKeyNamespace(
+ testNamespace, namespaceSerializer);
+ } else {
+ keyBuilder.setNamespace(testNamespace, namespaceSerializer);
+ compositeBytes = keyBuilder.build();
+ }
deserializer.setBuffer(compositeBytes);
assertKeyGroupKeyNamespaceBytes(
testKey,
@@ -216,11 +260,12 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
Collection<K> testKeys,
Collection<N> testNamespaces,
Collection<U> testUserKeys,
- int maxParallelism)
+ int maxParallelism,
+ BuildKeyAndNamespaceType buildKeyAndNamespaceType)
throws IOException {
final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
- RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ SerializedCompositeKeyBuilder<K> keyBuilder =
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -231,13 +276,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
for (K testKey : testKeys) {
int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
for (N testNamespace : testNamespaces) {
+ if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.SET_AND_BUILD) {
+ keyBuilder.setNamespace(testNamespace, namespaceSerializer);
+ }
for (U testUserKey : testUserKeys) {
- byte[] compositeBytes =
- keyBuilder.buildCompositeKeyNamesSpaceUserKey(
- testNamespace,
- namespaceSerializer,
- testUserKey,
- userKeySerializer);
+ final byte[] compositeBytes;
+ if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) {
+ compositeBytes =
+ keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+ testNamespace,
+ namespaceSerializer,
+ testUserKey,
+ userKeySerializer);
+ } else {
+ compositeBytes =
+ keyBuilder.buildCompositeKeyUserKey(testUserKey, userKeySerializer);
+ }
deserializer.setBuffer(compositeBytes);
assertKeyGroupKeyNamespaceUserKeyBytes(
@@ -258,18 +312,16 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
}
}
- private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
+ private <K> SerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
TypeSerializer<K> serializer, int prefixBytes) {
final boolean variableSize =
- RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
- return new RocksDBSerializedCompositeKeyBuilder<>(
+ CompositeKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+ return new SerializedCompositeKeyBuilder<>(
serializer, dataOutputSerializer, prefixBytes, variableSize, 0);
}
private <K> int setKeyAndReturnKeyGroup(
- RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
- K key,
- int maxParallelism) {
+ SerializedCompositeKeyBuilder<K> compositeKeyBuilder, K key, int maxParallelism) {
int keyGroup =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
@@ -288,10 +340,10 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
throws IOException {
Assert.assertEquals(
- keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+ keyGroup, CompositeKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
Assert.assertEquals(
key,
- RocksDBKeySerializationUtils.readKey(
+ CompositeKeySerializationUtils.readKey(
typeSerializer, deserializer, ambiguousCompositeKeyPossible));
}
@@ -313,7 +365,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
deserializer,
ambiguousCompositeKeyPossible);
N readNamespace =
- RocksDBKeySerializationUtils.readNamespace(
+ CompositeKeySerializationUtils.readNamespace(
namespaceSerializer, deserializer, ambiguousCompositeKeyPossible);
Assert.assertEquals(namespace, readNamespace);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index c9a07cf..4ea9022 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
protected final DataInputDeserializer dataInputView;
- private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
+ private final SerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
/**
* Creates a new RocksDB backed state.
@@ -138,8 +139,8 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
KeyGroupRangeAssignment.assignToKeyGroup(
keyAndNamespace.f0, backend.getNumberOfKeyGroups());
- RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
- new RocksDBSerializedCompositeKeyBuilder<>(
+ SerializedCompositeKeyBuilder<K> keyBuilder =
+ new SerializedCompositeKeyBuilder<>(
safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32);
keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index d96f8e7..4ea2e16 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.util.CloseableIterator;
@@ -357,7 +358,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
outputView.clear();
try {
- RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
+ CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
} catch (IOException e) {
throw new FlinkRuntimeException("Could not write key-group bytes.", e);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5a76c08..958594f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -91,18 +92,18 @@ public class RocksDBIncrementalCheckpointUtils {
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
deleteRange(
db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
}
if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
deleteRange(
db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72d1a8c..d6c7cff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -50,6 +51,7 @@ import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
@@ -213,7 +215,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across
* all states.
*/
- private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+ private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
/**
* Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState}
@@ -247,7 +249,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksDBWriteBatchWrapper writeBatchWrapper,
ColumnFamilyHandle defaultColumnFamilyHandle,
RocksDBNativeMetricMonitor nativeMetricMonitor,
- RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
+ SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
PriorityQueueSetFactory priorityQueueFactory,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
InternalKeyContext<K> keyContext,
@@ -306,11 +308,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible =
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
@@ -352,7 +354,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final TypeSerializer<N> namespaceSerializer =
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
boolean ambiguousKeyPossible =
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
getKeySerializer(), namespaceSerializer);
RocksIteratorWrapper iterator =
@@ -493,7 +495,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return readOptions;
}
- RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+ SerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index a3349a1..5f6426c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -35,11 +35,13 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
@@ -255,10 +257,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
SnapshotStrategy<K> snapshotStrategy;
PriorityQueueSetFactory priorityQueueFactory;
- RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+ SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
// Number of bytes required to prefix the key groups.
int keyGroupPrefixBytes =
- RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
+ CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
numberOfKeyGroups);
try {
// Variables for snapshot strategy when incremental checkpoint is enabled
@@ -301,7 +303,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
// serializer
// only now we can be certain that the key serializer used in the builder is final.
sharedRocksKeyBuilder =
- new RocksDBSerializedCompositeKeyBuilder<>(
+ new SerializedCompositeKeyBuilder<>(
keySerializerProvider.currentSchemaSerializer(),
keyGroupPrefixBytes,
32);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 490b294..da68662 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -328,8 +329,8 @@ class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, U
KeyGroupRangeAssignment.assignToKeyGroup(
keyAndNamespace.f0, backend.getNumberOfKeyGroups());
- RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
- new RocksDBSerializedCompositeKeyBuilder<>(
+ SerializedCompositeKeyBuilder<K> keyBuilder =
+ new SerializedCompositeKeyBuilder<>(
safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32);
keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
index 582afe3..53f30ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
@@ -20,9 +20,9 @@ package org.apache.flink.contrib.streaming.state.iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import javax.annotation.Nonnull;
@@ -64,7 +64,7 @@ public abstract class AbstractRocksStateKeysIterator<K> implements AutoCloseable
protected K deserializeKey(byte[] keyBytes, DataInputDeserializer readView) throws IOException {
readView.setBuffer(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
- return RocksDBKeySerializationUtils.readKey(
+ return CompositeKeySerializationUtils.readKey(
keySerializer, byteArrayDataInputView, ambiguousKeyPossible);
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
index 96c10f1..386c33b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
@@ -20,8 +20,8 @@ package org.apache.flink.contrib.streaming.state.iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.util.FlinkRuntimeException;
import javax.annotation.Nonnull;
@@ -64,7 +64,7 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends AbstractRocksState
final byte[] keyBytes = iterator.key();
final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView);
final N currentNamespace =
- RocksDBKeySerializationUtils.readNamespace(
+ CompositeKeySerializationUtils.readNamespace(
namespaceSerializer, byteArrayDataInputView, ambiguousKeyPossible);
nextKey = Tuple2.of(currentKey, currentNamespace);
iterator.next();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index a17552d..6b66853 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state.restore;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
@@ -32,6 +31,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
@@ -304,11 +304,11 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
// Transfer remaining key-groups from temporary instance into base DB
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- RocksDBKeySerializationUtils.serializeKeyGroup(
+ CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index c29955e..45da654 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
@@ -58,7 +59,8 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest
DataOutputSerializer outputStreamWithPos = new DataOutputSerializer(128);
DataInputDeserializer inputStreamWithPos = new DataInputDeserializer();
int keyGroupPrefixBytes =
- RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups);
+ CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
+ numKeyGroups);
TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32);
return new RocksDBCachingPriorityQueueSet<>(
keyGroupId,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index fc1627c..281aa4b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.util.TestLogger;
@@ -139,8 +140,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
for (int j = 0; j < 100; ++j) {
outputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
- RocksDBKeySerializationUtils.writeKey(
+ CompositeKeySerializationUtils.writeKeyGroup(
+ i, keyGroupPrefixBytes, outputView);
+ CompositeKeySerializationUtils.writeKey(
j, IntSerializer.INSTANCE, outputView, false);
rocksDB.put(
columnFamilyHandle,
@@ -152,8 +154,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
for (int j = 0; j < 100; ++j) {
outputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
- RocksDBKeySerializationUtils.writeKey(
+ CompositeKeySerializationUtils.writeKeyGroup(
+ i, keyGroupPrefixBytes, outputView);
+ CompositeKeySerializationUtils.writeKey(
j, IntSerializer.INSTANCE, outputView, false);
byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
Assert.assertEquals(String.valueOf(j), new String(value));
@@ -171,8 +174,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
for (int j = 0; j < 100; ++j) {
outputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
- RocksDBKeySerializationUtils.writeKey(
+ CompositeKeySerializationUtils.writeKeyGroup(
+ i, keyGroupPrefixBytes, outputView);
+ CompositeKeySerializationUtils.writeKey(
j, IntSerializer.INSTANCE, outputView, false);
byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
if (targetGroupRange.contains(i)) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
index 56949ac..b62e689 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamespaceIterator;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.junit.Assert;
import org.junit.Rule;
@@ -85,9 +86,9 @@ public class RocksDBRocksStateKeysAndNamespacesIteratorTest {
DataOutputSerializer outputStream = new DataOutputSerializer(8);
boolean ambiguousKeyPossible =
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
keySerializer, StringSerializer.INSTANCE);
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible);
// already created with the state, should be closed with the backend
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index fdd53a3..6b9e7a4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.junit.Assert;
import org.junit.Rule;
@@ -82,9 +83,9 @@ public class RocksDBRocksStateKeysIteratorTest {
DataOutputSerializer outputStream = new DataOutputSerializer(8);
boolean ambiguousKeyPossible =
- RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+ CompositeKeySerializationUtils.isAmbiguousKeyPossible(
keySerializer, StringSerializer.INSTANCE);
- RocksDBKeySerializationUtils.writeNameSpace(
+ CompositeKeySerializationUtils.writeNameSpace(
namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible);
byte[] nameSpaceBytes = outputStream.getCopyOfBuffer();