You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:18 UTC

[flink] 02/09: [refactor] Move RocksDBCompositeKeyBuilder to a common package

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3436cb67866fddec45c9ac1bd760c24732ca32b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jan 28 14:41:53 2021 +0100

    [refactor] Move RocksDBCompositeKeyBuilder to a common package
---
 .../state/CompositeKeySerializationUtils.java      |   7 +-
 .../state/SerializedCompositeKeyBuilder.java       |  79 ++++++++++---
 .../state/CompositeKeySerializationUtilsTest.java  |  32 +++---
 .../state/SerializedCompositeKeyBuilderTest.java   | 128 +++++++++++++++------
 .../streaming/state/AbstractRocksDBState.java      |   7 +-
 .../state/RocksDBCachingPriorityQueueSet.java      |   3 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   9 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  14 ++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |   8 +-
 .../contrib/streaming/state/RocksDBMapState.java   |   5 +-
 .../iterator/AbstractRocksStateKeysIterator.java   |   4 +-
 .../RocksStateKeysAndNamespaceIterator.java        |   4 +-
 .../RocksDBIncrementalRestoreOperation.java        |   6 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   4 +-
 .../RocksDBIncrementalCheckpointUtilsTest.java     |  16 ++-
 ...sDBRocksStateKeysAndNamespacesIteratorTest.java |   5 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   5 +-
 17 files changed, 227 insertions(+), 109 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
similarity index 96%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
index 95c7e0b..8088c19 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeKeySerializationUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -28,9 +28,10 @@ import javax.annotation.Nonnull;
 import java.io.IOException;
 
 /** Utils for RocksDB state serialization and deserialization. */
-public class RocksDBKeySerializationUtils {
+public class CompositeKeySerializationUtils {
 
-    static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
+    public static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView)
+            throws IOException {
         int keyGroup = 0;
         for (int i = 0; i < keyGroupPrefixBytes; ++i) {
             keyGroup <<= 8;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
similarity index 72%
rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
index 78397ac..382bf04 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilder.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -38,7 +38,7 @@ import java.io.IOException;
  */
 @NotThreadSafe
 @Internal
-class RocksDBSerializedCompositeKeyBuilder<K> {
+public final class SerializedCompositeKeyBuilder<K> {
 
     /** The serializer for the key. */
     @Nonnull private final TypeSerializer<K> keySerializer;
@@ -55,7 +55,9 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
     /** Mark for the position after the serialized key. */
     @Nonnegative private int afterKeyMark;
 
-    public RocksDBSerializedCompositeKeyBuilder(
+    @Nonnegative private int afterNamespaceMark;
+
+    public SerializedCompositeKeyBuilder(
             @Nonnull TypeSerializer<K> keySerializer,
             @Nonnegative int keyGroupPrefixBytes,
             @Nonnegative int initialSize) {
@@ -63,12 +65,12 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
                 keySerializer,
                 new DataOutputSerializer(initialSize),
                 keyGroupPrefixBytes,
-                RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+                CompositeKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
                 0);
     }
 
     @VisibleForTesting
-    RocksDBSerializedCompositeKeyBuilder(
+    SerializedCompositeKeyBuilder(
             @Nonnull TypeSerializer<K> keySerializer,
             @Nonnull DataOutputSerializer keyOutView,
             @Nonnegative int keyGroupPrefixBytes,
@@ -96,6 +98,15 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
         }
     }
 
+    public <N> void setNamespace(
+            @Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
+        try {
+            serializeNamespace(namespace, namespaceSerializer);
+        } catch (IOException shouldNeverHappen) {
+            throw new FlinkRuntimeException(shouldNeverHappen);
+        }
+    }
+
     /**
      * Returns a serialized composite key, from the key and key-group provided in a previous call to
      * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
@@ -110,9 +121,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
             @Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
         try {
             serializeNamespace(namespace, namespaceSerializer);
-            final byte[] result = keyOutView.getCopyOfBuffer();
-            resetToKey();
-            return result;
+            return keyOutView.getCopyOfBuffer();
         } catch (IOException shouldNeverHappen) {
             throw new FlinkRuntimeException(shouldNeverHappen);
         }
@@ -120,7 +129,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
 
     /**
      * Returns a serialized composite key, from the key and key-group provided in a previous call to
-     * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, folloed by the given
+     * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, followed by the given
      * user-key.
      *
      * @param namespace the namespace to concatenate for the serialized composite key bytes.
@@ -141,9 +150,35 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
             throws IOException {
         serializeNamespace(namespace, namespaceSerializer);
         userKeySerializer.serialize(userKey, keyOutView);
-        byte[] result = keyOutView.getCopyOfBuffer();
-        resetToKey();
-        return result;
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    /**
+     * Returns a serialized composite key, from the key and key-group provided in a previous call to
+     * {@link #setKeyAndKeyGroup(Object, int)} and the namespace provided in {@link
+     * #setNamespace(Object, TypeSerializer)}, followed by the given user-key.
+     *
+     * @param userKey the user-key to concatenate for the serialized composite key, after the
+     *     namespace.
+     * @param userKeySerializer the serializer to obtain the serialized form of the user-key.
+     * @param <UK> the type of the user-key.
+     * @return the bytes for the serialized composite key of key-group, key, namespace.
+     */
+    @Nonnull
+    public <UK> byte[] buildCompositeKeyUserKey(
+            @Nonnull UK userKey, @Nonnull TypeSerializer<UK> userKeySerializer) throws IOException {
+        // this should only be called when there is already a namespace written.
+        assert isNamespaceWritten();
+        resetToNamespace();
+
+        userKeySerializer.serialize(userKey, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    /** Returns a serialized composite key, from whatever was set so far. */
+    @Nonnull
+    public byte[] build() throws IOException {
+        return keyOutView.getCopyOfBuffer();
     }
 
     private void serializeKeyGroupAndKey(K key, int keyGroupId) throws IOException {
@@ -152,7 +187,7 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
         resetFully();
 
         // write key-group
-        RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView);
+        CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, keyGroupPrefixBytes, keyOutView);
         // write key
         keySerializer.serialize(key, keyOutView);
         afterKeyMark = keyOutView.length();
@@ -165,33 +200,45 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
         // this should only be called when there is already a key written so that we build the
         // composite.
         assert isKeyWritten();
+        resetToKey();
 
         final boolean ambiguousCompositeKeyPossible =
                 isAmbiguousCompositeKeyPossible(namespaceSerializer);
         if (ambiguousCompositeKeyPossible) {
-            RocksDBKeySerializationUtils.writeVariableIntBytes(
+            CompositeKeySerializationUtils.writeVariableIntBytes(
                     afterKeyMark - keyGroupPrefixBytes, keyOutView);
         }
-        RocksDBKeySerializationUtils.writeNameSpace(
+        CompositeKeySerializationUtils.writeNameSpace(
                 namespace, namespaceSerializer, keyOutView, ambiguousCompositeKeyPossible);
+        afterNamespaceMark = keyOutView.length();
     }
 
     private void resetFully() {
         afterKeyMark = 0;
+        afterNamespaceMark = 0;
         keyOutView.clear();
     }
 
     private void resetToKey() {
+        afterNamespaceMark = 0;
         keyOutView.setPosition(afterKeyMark);
     }
 
+    private void resetToNamespace() {
+        keyOutView.setPosition(afterNamespaceMark);
+    }
+
     private boolean isKeyWritten() {
         return afterKeyMark > 0;
     }
 
+    private boolean isNamespaceWritten() {
+        return afterNamespaceMark > 0;
+    }
+
     @VisibleForTesting
     boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
         return keySerializerTypeVariableSized
-                & RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+                & CompositeKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
similarity index 80%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
index 3bbb20b..65ba8ea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CompositeKeySerializationUtilsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -30,17 +30,17 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
 
-/** Tests for guarding {@link RocksDBKeySerializationUtils}. */
-public class RocksDBKeySerializationUtilsTest {
+/** Tests for guarding {@link CompositeKeySerializationUtils}. */
+public class CompositeKeySerializationUtilsTest {
 
     @Test
     public void testIsAmbiguousKeyPossible() {
         Assert.assertFalse(
-                RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                         IntSerializer.INSTANCE, StringSerializer.INSTANCE));
 
         Assert.assertTrue(
-                RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                         StringSerializer.INSTANCE, StringSerializer.INSTANCE));
     }
 
@@ -52,10 +52,10 @@ public class RocksDBKeySerializationUtilsTest {
         for (int keyGroupPrefixBytes = 1; keyGroupPrefixBytes <= 2; ++keyGroupPrefixBytes) {
             for (int orgKeyGroup = 0; orgKeyGroup < 128; ++orgKeyGroup) {
                 outputStream.reset();
-                RocksDBKeySerializationUtils.writeKeyGroup(
+                CompositeKeySerializationUtils.writeKeyGroup(
                         orgKeyGroup, keyGroupPrefixBytes, outputView);
                 int deserializedKeyGroup =
-                        RocksDBKeySerializationUtils.readKeyGroup(
+                        CompositeKeySerializationUtils.readKeyGroup(
                                 keyGroupPrefixBytes,
                                 new DataInputViewStreamWrapper(
                                         new ByteArrayInputStreamWithPos(
@@ -73,17 +73,19 @@ public class RocksDBKeySerializationUtilsTest {
         // test for key
         for (int orgKey = 0; orgKey < 100; ++orgKey) {
             outputView.clear();
-            RocksDBKeySerializationUtils.writeKey(
+            CompositeKeySerializationUtils.writeKey(
                     orgKey, IntSerializer.INSTANCE, outputView, false);
             inputView.setBuffer(outputView.getCopyOfBuffer());
             int deserializedKey =
-                    RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
+                    CompositeKeySerializationUtils.readKey(
+                            IntSerializer.INSTANCE, inputView, false);
             Assert.assertEquals(orgKey, deserializedKey);
 
-            RocksDBKeySerializationUtils.writeKey(orgKey, IntSerializer.INSTANCE, outputView, true);
+            CompositeKeySerializationUtils.writeKey(
+                    orgKey, IntSerializer.INSTANCE, outputView, true);
             inputView.setBuffer(outputView.getCopyOfBuffer());
             deserializedKey =
-                    RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
+                    CompositeKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
             Assert.assertEquals(orgKey, deserializedKey);
         }
     }
@@ -95,19 +97,19 @@ public class RocksDBKeySerializationUtilsTest {
 
         for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
             outputView.clear();
-            RocksDBKeySerializationUtils.writeNameSpace(
+            CompositeKeySerializationUtils.writeNameSpace(
                     orgNamespace, IntSerializer.INSTANCE, outputView, false);
             inputView.setBuffer(outputView.getCopyOfBuffer());
             int deserializedNamepsace =
-                    RocksDBKeySerializationUtils.readNamespace(
+                    CompositeKeySerializationUtils.readNamespace(
                             IntSerializer.INSTANCE, inputView, false);
             Assert.assertEquals(orgNamespace, deserializedNamepsace);
 
-            RocksDBKeySerializationUtils.writeNameSpace(
+            CompositeKeySerializationUtils.writeNameSpace(
                     orgNamespace, IntSerializer.INSTANCE, outputView, true);
             inputView.setBuffer(outputView.getCopyOfBuffer());
             deserializedNamepsace =
-                    RocksDBKeySerializationUtils.readNamespace(
+                    CompositeKeySerializationUtils.readNamespace(
                             IntSerializer.INSTANCE, inputView, true);
             Assert.assertEquals(orgNamespace, deserializedNamepsace);
         }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
similarity index 73%
rename from flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
index 9e4a208..8c7ac96 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializedCompositeKeyBuilderTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.contrib.streaming.state;
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,8 +32,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
-/** Test for @{@link RocksDBSerializedCompositeKeyBuilder}. */
-public class RocksDBSerializedCompositeKeyBuilderTest {
+/** Test for @{@link SerializedCompositeKeyBuilder}. */
+public class SerializedCompositeKeyBuilderTest {
 
     private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
 
@@ -57,36 +56,60 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
 
     @Test
     public void testSetKeyNamespace() throws IOException {
+        testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.BUILD);
+    }
+
+    @Test
+    public void testSetKeyNamespaceWithSet() throws IOException {
+        testSetKeyNamespaceInternal(BuildKeyAndNamespaceType.SET_AND_BUILD);
+    }
+
+    private void testSetKeyNamespaceInternal(BuildKeyAndNamespaceType buildKeyAndNamespaceType)
+            throws IOException {
         for (int parallelism : TEST_PARALLELISMS) {
             testSetKeyNamespaceInternal(
                     IntSerializer.INSTANCE,
                     IntSerializer.INSTANCE,
                     TEST_INTS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceInternal(
                     IntSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
                     TEST_INTS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceInternal(
                     StringSerializer.INSTANCE,
                     IntSerializer.INSTANCE,
                     TEST_STRINGS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceInternal(
                     StringSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
                     TEST_STRINGS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
         }
     }
 
     @Test
     public void testSetKeyNamespaceUserKey() throws IOException {
+        testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.BUILD);
+    }
+
+    @Test
+    public void testSetKeyNamespaceUserKeyWithSet() throws IOException {
+        testSetKeyNamespaceUserKeyInternal(BuildKeyAndNamespaceType.SET_AND_BUILD);
+    }
+
+    private void testSetKeyNamespaceUserKeyInternal(
+            BuildKeyAndNamespaceType buildKeyAndNamespaceType) throws IOException {
         for (int parallelism : TEST_PARALLELISMS) {
             testSetKeyNamespaceUserKeyInternal(
                     IntSerializer.INSTANCE,
@@ -95,7 +118,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_INTS,
                     TEST_INTS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     IntSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
@@ -103,7 +127,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_INTS,
                     TEST_STRINGS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     StringSerializer.INSTANCE,
                     IntSerializer.INSTANCE,
@@ -111,7 +136,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_STRINGS,
                     TEST_INTS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     StringSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
@@ -119,7 +145,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_STRINGS,
                     TEST_STRINGS,
                     TEST_INTS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     IntSerializer.INSTANCE,
                     IntSerializer.INSTANCE,
@@ -127,7 +154,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_INTS,
                     TEST_INTS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     IntSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
@@ -135,7 +163,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_INTS,
                     TEST_STRINGS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     StringSerializer.INSTANCE,
                     IntSerializer.INSTANCE,
@@ -143,7 +172,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_STRINGS,
                     TEST_INTS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
             testSetKeyNamespaceUserKeyInternal(
                     StringSerializer.INSTANCE,
                     StringSerializer.INSTANCE,
@@ -151,7 +181,8 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                     TEST_STRINGS,
                     TEST_STRINGS,
                     TEST_STRINGS,
-                    parallelism);
+                    parallelism,
+                    buildKeyAndNamespaceType);
         }
     }
 
@@ -159,7 +190,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
             TypeSerializer<K> serializer, Collection<K> testKeys, int maxParallelism)
             throws IOException {
         final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
-        RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+        SerializedCompositeKeyBuilder<K> keyBuilder =
                 createRocksDBSerializedCompositeKeyBuilder(serializer, prefixBytes);
 
         final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -172,16 +203,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
         }
     }
 
+    enum BuildKeyAndNamespaceType {
+        BUILD,
+        SET_AND_BUILD
+    }
+
     private <K, N> void testSetKeyNamespaceInternal(
             TypeSerializer<K> keySerializer,
             TypeSerializer<N> namespaceSerializer,
             Collection<K> testKeys,
             Collection<N> testNamespaces,
-            int maxParallelism)
+            int maxParallelism,
+            BuildKeyAndNamespaceType buildKeyAndNamespaceType)
             throws IOException {
         final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
 
-        RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+        SerializedCompositeKeyBuilder<K> keyBuilder =
                 createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
 
         final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -192,8 +229,15 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
         for (K testKey : testKeys) {
             int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
             for (N testNamespace : testNamespaces) {
-                byte[] compositeBytes =
-                        keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+                final byte[] compositeBytes;
+                if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) {
+                    compositeBytes =
+                            keyBuilder.buildCompositeKeyNamespace(
+                                    testNamespace, namespaceSerializer);
+                } else {
+                    keyBuilder.setNamespace(testNamespace, namespaceSerializer);
+                    compositeBytes = keyBuilder.build();
+                }
                 deserializer.setBuffer(compositeBytes);
                 assertKeyGroupKeyNamespaceBytes(
                         testKey,
@@ -216,11 +260,12 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
             Collection<K> testKeys,
             Collection<N> testNamespaces,
             Collection<U> testUserKeys,
-            int maxParallelism)
+            int maxParallelism,
+            BuildKeyAndNamespaceType buildKeyAndNamespaceType)
             throws IOException {
         final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
 
-        RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+        SerializedCompositeKeyBuilder<K> keyBuilder =
                 createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
 
         final DataInputDeserializer deserializer = new DataInputDeserializer();
@@ -231,13 +276,22 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
         for (K testKey : testKeys) {
             int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, testKey, maxParallelism);
             for (N testNamespace : testNamespaces) {
+                if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.SET_AND_BUILD) {
+                    keyBuilder.setNamespace(testNamespace, namespaceSerializer);
+                }
                 for (U testUserKey : testUserKeys) {
-                    byte[] compositeBytes =
-                            keyBuilder.buildCompositeKeyNamesSpaceUserKey(
-                                    testNamespace,
-                                    namespaceSerializer,
-                                    testUserKey,
-                                    userKeySerializer);
+                    final byte[] compositeBytes;
+                    if (buildKeyAndNamespaceType == BuildKeyAndNamespaceType.BUILD) {
+                        compositeBytes =
+                                keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+                                        testNamespace,
+                                        namespaceSerializer,
+                                        testUserKey,
+                                        userKeySerializer);
+                    } else {
+                        compositeBytes =
+                                keyBuilder.buildCompositeKeyUserKey(testUserKey, userKeySerializer);
+                    }
 
                     deserializer.setBuffer(compositeBytes);
                     assertKeyGroupKeyNamespaceUserKeyBytes(
@@ -258,18 +312,16 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
         }
     }
 
-    private <K> RocksDBSerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
+    private <K> SerializedCompositeKeyBuilder<K> createRocksDBSerializedCompositeKeyBuilder(
             TypeSerializer<K> serializer, int prefixBytes) {
         final boolean variableSize =
-                RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
-        return new RocksDBSerializedCompositeKeyBuilder<>(
+                CompositeKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+        return new SerializedCompositeKeyBuilder<>(
                 serializer, dataOutputSerializer, prefixBytes, variableSize, 0);
     }
 
     private <K> int setKeyAndReturnKeyGroup(
-            RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
-            K key,
-            int maxParallelism) {
+            SerializedCompositeKeyBuilder<K> compositeKeyBuilder, K key, int maxParallelism) {
 
         int keyGroup =
                 KeyGroupRangeAssignment.assignKeyToParallelOperator(
@@ -288,10 +340,10 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
             throws IOException {
 
         Assert.assertEquals(
-                keyGroup, RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+                keyGroup, CompositeKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
         Assert.assertEquals(
                 key,
-                RocksDBKeySerializationUtils.readKey(
+                CompositeKeySerializationUtils.readKey(
                         typeSerializer, deserializer, ambiguousCompositeKeyPossible));
     }
 
@@ -313,7 +365,7 @@ public class RocksDBSerializedCompositeKeyBuilderTest {
                 deserializer,
                 ambiguousCompositeKeyPossible);
         N readNamespace =
-                RocksDBKeySerializationUtils.readNamespace(
+                CompositeKeySerializationUtils.readNamespace(
                         namespaceSerializer, deserializer, ambiguousCompositeKeyPossible);
         Assert.assertEquals(namespace, readNamespace);
     }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index c9a07cf..4ea9022 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 
     protected final DataInputDeserializer dataInputView;
 
-    private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
+    private final SerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;
 
     /**
      * Creates a new RocksDB backed state.
@@ -138,8 +139,8 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
                 KeyGroupRangeAssignment.assignToKeyGroup(
                         keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-        RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
-                new RocksDBSerializedCompositeKeyBuilder<>(
+        SerializedCompositeKeyBuilder<K> keyBuilder =
+                new SerializedCompositeKeyBuilder<>(
                         safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32);
         keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
         byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index d96f8e7..4ea2e16 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.util.CloseableIterator;
@@ -357,7 +358,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
         outputView.clear();
 
         try {
-            RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
+            CompositeKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
         } catch (IOException e) {
             throw new FlinkRuntimeException("Could not write key-group bytes.", e);
         }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5a76c08..958594f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
@@ -91,18 +92,18 @@ public class RocksDBIncrementalCheckpointUtils {
         final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
 
         if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
-            RocksDBKeySerializationUtils.serializeKeyGroup(
+            CompositeKeySerializationUtils.serializeKeyGroup(
                     currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
-            RocksDBKeySerializationUtils.serializeKeyGroup(
+            CompositeKeySerializationUtils.serializeKeyGroup(
                     targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
             deleteRange(
                     db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
         }
 
         if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
-            RocksDBKeySerializationUtils.serializeKeyGroup(
+            CompositeKeySerializationUtils.serializeKeyGroup(
                     targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
-            RocksDBKeySerializationUtils.serializeKeyGroup(
+            CompositeKeySerializationUtils.serializeKeyGroup(
                     currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
             deleteRange(
                     db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72d1a8c..d6c7cff 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -50,6 +51,7 @@ import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategyRunner;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
@@ -213,7 +215,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
      * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across
      * all states.
      */
-    private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+    private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
 
     /**
      * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState}
@@ -247,7 +249,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             RocksDBWriteBatchWrapper writeBatchWrapper,
             ColumnFamilyHandle defaultColumnFamilyHandle,
             RocksDBNativeMetricMonitor nativeMetricMonitor,
-            RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
+            SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder,
             PriorityQueueSetFactory priorityQueueFactory,
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             InternalKeyContext<K> keyContext,
@@ -306,11 +308,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
                 registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
         final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
         boolean ambiguousKeyPossible =
-                RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                         getKeySerializer(), namespaceSerializer);
         final byte[] nameSpaceBytes;
         try {
-            RocksDBKeySerializationUtils.writeNameSpace(
+            CompositeKeySerializationUtils.writeNameSpace(
                     namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible);
             nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
         } catch (IOException ex) {
@@ -352,7 +354,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
         final TypeSerializer<N> namespaceSerializer =
                 registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
         boolean ambiguousKeyPossible =
-                RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                         getKeySerializer(), namespaceSerializer);
 
         RocksIteratorWrapper iterator =
@@ -493,7 +495,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
         return readOptions;
     }
 
-    RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+    SerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
         return sharedRocksKeyBuilder;
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index a3349a1..5f6426c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -35,11 +35,13 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
@@ -255,10 +257,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
         ResourceGuard rocksDBResourceGuard = new ResourceGuard();
         SnapshotStrategy<K> snapshotStrategy;
         PriorityQueueSetFactory priorityQueueFactory;
-        RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+        SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
         // Number of bytes required to prefix the key groups.
         int keyGroupPrefixBytes =
-                RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
+                CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
                         numberOfKeyGroups);
         try {
             // Variables for snapshot strategy when incremental checkpoint is enabled
@@ -301,7 +303,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
             // serializer
             // only now we can be certain that the key serializer used in the builder is final.
             sharedRocksKeyBuilder =
-                    new RocksDBSerializedCompositeKeyBuilder<>(
+                    new SerializedCompositeKeyBuilder<>(
                             keySerializerProvider.currentSchemaSerializer(),
                             keyGroupPrefixBytes,
                             32);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 490b294..da68662 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -328,8 +329,8 @@ class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, U
                 KeyGroupRangeAssignment.assignToKeyGroup(
                         keyAndNamespace.f0, backend.getNumberOfKeyGroups());
 
-        RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
-                new RocksDBSerializedCompositeKeyBuilder<>(
+        SerializedCompositeKeyBuilder<K> keyBuilder =
+                new SerializedCompositeKeyBuilder<>(
                         safeKeySerializer, backend.getKeyGroupPrefixBytes(), 32);
 
         keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
index 582afe3..53f30ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/AbstractRocksStateKeysIterator.java
@@ -20,9 +20,9 @@ package org.apache.flink.contrib.streaming.state.iterator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 
 import javax.annotation.Nonnull;
 
@@ -64,7 +64,7 @@ public abstract class AbstractRocksStateKeysIterator<K> implements AutoCloseable
 
     protected K deserializeKey(byte[] keyBytes, DataInputDeserializer readView) throws IOException {
         readView.setBuffer(keyBytes, keyGroupPrefixBytes, keyBytes.length - keyGroupPrefixBytes);
-        return RocksDBKeySerializationUtils.readKey(
+        return CompositeKeySerializationUtils.readKey(
                 keySerializer, byteArrayDataInputView, ambiguousKeyPossible);
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
index 96c10f1..386c33b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
@@ -20,8 +20,8 @@ package org.apache.flink.contrib.streaming.state.iterator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nonnull;
@@ -64,7 +64,7 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends AbstractRocksState
                 final byte[] keyBytes = iterator.key();
                 final K currentKey = deserializeKey(keyBytes, byteArrayDataInputView);
                 final N currentNamespace =
-                        RocksDBKeySerializationUtils.readNamespace(
+                        CompositeKeySerializationUtils.readNamespace(
                                 namespaceSerializer, byteArrayDataInputView, ambiguousKeyPossible);
                 nextKey = Tuple2.of(currentKey, currentNamespace);
                 iterator.next();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index a17552d..6b66853 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.restore;
 
 import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
-import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
@@ -32,6 +31,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
@@ -304,11 +304,11 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 
         // Transfer remaining key-groups from temporary instance into base DB
         byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        RocksDBKeySerializationUtils.serializeKeyGroup(
+        CompositeKeySerializationUtils.serializeKeyGroup(
                 keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
 
         byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        RocksDBKeySerializationUtils.serializeKeyGroup(
+        CompositeKeySerializationUtils.serializeKeyGroup(
                 keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
 
         for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index c29955e..45da654 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
@@ -58,7 +59,8 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest
             DataOutputSerializer outputStreamWithPos = new DataOutputSerializer(128);
             DataInputDeserializer inputStreamWithPos = new DataInputDeserializer();
             int keyGroupPrefixBytes =
-                    RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups);
+                    CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
+                            numKeyGroups);
             TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32);
             return new RocksDBCachingPriorityQueueSet<>(
                     keyGroupId,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index fc1627c..281aa4b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.TestLogger;
@@ -139,8 +140,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
             for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
                 for (int j = 0; j < 100; ++j) {
                     outputView.clear();
-                    RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
-                    RocksDBKeySerializationUtils.writeKey(
+                    CompositeKeySerializationUtils.writeKeyGroup(
+                            i, keyGroupPrefixBytes, outputView);
+                    CompositeKeySerializationUtils.writeKey(
                             j, IntSerializer.INSTANCE, outputView, false);
                     rocksDB.put(
                             columnFamilyHandle,
@@ -152,8 +154,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
             for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
                 for (int j = 0; j < 100; ++j) {
                     outputView.clear();
-                    RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
-                    RocksDBKeySerializationUtils.writeKey(
+                    CompositeKeySerializationUtils.writeKeyGroup(
+                            i, keyGroupPrefixBytes, outputView);
+                    CompositeKeySerializationUtils.writeKey(
                             j, IntSerializer.INSTANCE, outputView, false);
                     byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
                     Assert.assertEquals(String.valueOf(j), new String(value));
@@ -171,8 +174,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
             for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
                 for (int j = 0; j < 100; ++j) {
                     outputView.clear();
-                    RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
-                    RocksDBKeySerializationUtils.writeKey(
+                    CompositeKeySerializationUtils.writeKeyGroup(
+                            i, keyGroupPrefixBytes, outputView);
+                    CompositeKeySerializationUtils.writeKey(
                             j, IntSerializer.INSTANCE, outputView, false);
                     byte[] value = rocksDB.get(columnFamilyHandle, outputView.getCopyOfBuffer());
                     if (targetGroupRange.contains(i)) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
index 56949ac..b62e689 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamespaceIterator;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -85,9 +86,9 @@ public class RocksDBRocksStateKeysAndNamespacesIteratorTest {
 
             DataOutputSerializer outputStream = new DataOutputSerializer(8);
             boolean ambiguousKeyPossible =
-                    RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                    CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                             keySerializer, StringSerializer.INSTANCE);
-            RocksDBKeySerializationUtils.writeNameSpace(
+            CompositeKeySerializationUtils.writeNameSpace(
                     namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible);
 
             // already created with the state, should be closed with the backend
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index fdd53a3..6b9e7a4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -82,9 +83,9 @@ public class RocksDBRocksStateKeysIteratorTest {
 
             DataOutputSerializer outputStream = new DataOutputSerializer(8);
             boolean ambiguousKeyPossible =
-                    RocksDBKeySerializationUtils.isAmbiguousKeyPossible(
+                    CompositeKeySerializationUtils.isAmbiguousKeyPossible(
                             keySerializer, StringSerializer.INSTANCE);
-            RocksDBKeySerializationUtils.writeNameSpace(
+            CompositeKeySerializationUtils.writeNameSpace(
                     namespace, StringSerializer.INSTANCE, outputStream, ambiguousKeyPossible);
 
             byte[] nameSpaceBytes = outputStream.getCopyOfBuffer();