You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:17 UTC

[flink] 01/09: [refactor] Extract common logic for serializing delimited list

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aae75bfe3cc16c56aa3a35b21b3698cbf7d7d209
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jan 28 14:04:06 2021 +0100

    [refactor] Extract common logic for serializing delimited list
---
 .../runtime/state/ListDelimitedSerializer.java     | 93 ++++++++++++++++++++++
 .../streaming/state/AbstractRocksDBState.java      | 22 -----
 .../contrib/streaming/state/RocksDBListState.java  | 81 +++++--------------
 3 files changed, 111 insertions(+), 85 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java
new file mode 100644
index 0000000..ddfb916
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates a logic of serialization and deserialization of a list with a delimiter. Used in the
+ * savepoint format.
+ */
+public final class ListDelimitedSerializer {
+
+    private static final byte DELIMITER = ',';
+
+    private final DataInputDeserializer dataInputView = new DataInputDeserializer();
+    private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128);
+
+    public <T> List<T> deserializeList(byte[] valueBytes, TypeSerializer<T> elementSerializer) {
+        if (valueBytes == null) {
+            return null;
+        }
+
+        dataInputView.setBuffer(valueBytes);
+
+        List<T> result = new ArrayList<>();
+        T next;
+        while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
+            result.add(next);
+        }
+        return result;
+    }
+
+    public <T> byte[] serializeList(List<T> valueList, TypeSerializer<T> elementSerializer)
+            throws IOException {
+
+        dataOutputView.clear();
+        boolean first = true;
+
+        for (T value : valueList) {
+            Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+            if (first) {
+                first = false;
+            } else {
+                dataOutputView.write(DELIMITER);
+            }
+            elementSerializer.serialize(value, dataOutputView);
+        }
+
+        return dataOutputView.getCopyOfBuffer();
+    }
+
+    /** Deserializes a single element from a serialized list. */
+    public static <T> T deserializeNextElement(
+            DataInputDeserializer in, TypeSerializer<T> elementSerializer) {
+        try {
+            if (in.available() > 0) {
+                T element = elementSerializer.deserialize(in);
+                if (in.available() > 0) {
+                    in.readByte();
+                }
+                return element;
+            }
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Unexpected list element deserialization failure", e);
+        }
+        return null;
+    }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 621e93a..c9a07cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,7 +34,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Base class for {@link State} implementations that store state in a RocksDB database.
@@ -180,27 +179,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
         return serializeValueInternal(value, serializer);
     }
 
-    <T> byte[] serializeValueList(
-            List<T> valueList, TypeSerializer<T> elementSerializer, byte delimiter)
-            throws IOException {
-
-        dataOutputView.clear();
-        boolean first = true;
-
-        for (T value : valueList) {
-            Preconditions.checkNotNull(value, "You cannot add null to a value list.");
-
-            if (first) {
-                first = false;
-            } else {
-                dataOutputView.write(delimiter);
-            }
-            elementSerializer.serialize(value, dataOutputView);
-        }
-
-        return dataOutputView.getCopyOfBuffer();
-    }
-
     public void migrateSerializedValue(
             DataInputDeserializer serializedOldValueInput,
             DataOutputSerializer serializedMigratedValueOutput,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 7e8353a..8bcecc7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -63,6 +64,8 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
     /** Serializer for the values. */
     private final TypeSerializer<V> elementSerializer;
 
+    private final ListDelimitedSerializer listSerializer;
+
     /** Separator of StringAppendTestOperator in RocksDB. */
     private static final byte DELIMITER = ',';
 
@@ -86,6 +89,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
 
         ListSerializer<V> castedListSerializer = (ListSerializer<V>) valueSerializer;
         this.elementSerializer = castedListSerializer.getElementSerializer();
+        this.listSerializer = new ListDelimitedSerializer();
     }
 
     @Override
@@ -113,43 +117,12 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
         try {
             byte[] key = serializeCurrentKeyWithGroupAndNamespace();
             byte[] valueBytes = backend.db.get(columnFamily, key);
-            return deserializeList(valueBytes);
+            return listSerializer.deserializeList(valueBytes, elementSerializer);
         } catch (RocksDBException e) {
             throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
         }
     }
 
-    private List<V> deserializeList(byte[] valueBytes) {
-        if (valueBytes == null) {
-            return null;
-        }
-
-        dataInputView.setBuffer(valueBytes);
-
-        List<V> result = new ArrayList<>();
-        V next;
-        while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
-            result.add(next);
-        }
-        return result;
-    }
-
-    private static <V> V deserializeNextElement(
-            DataInputDeserializer in, TypeSerializer<V> elementSerializer) {
-        try {
-            if (in.available() > 0) {
-                V element = elementSerializer.deserialize(in);
-                if (in.available() > 0) {
-                    in.readByte();
-                }
-                return element;
-            }
-        } catch (IOException e) {
-            throw new FlinkRuntimeException("Unexpected list element deserialization failure", e);
-        }
-        return null;
-    }
-
     @Override
     public void add(V value) {
         Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
@@ -210,7 +183,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
                         columnFamily,
                         writeOptions,
                         serializeCurrentKeyWithGroupAndNamespace(),
-                        serializeValueList(values, elementSerializer, DELIMITER));
+                        listSerializer.serializeList(values, elementSerializer));
             } catch (IOException | RocksDBException e) {
                 throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
             }
@@ -229,7 +202,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
                         columnFamily,
                         writeOptions,
                         serializeCurrentKeyWithGroupAndNamespace(),
-                        serializeValueList(values, elementSerializer, DELIMITER));
+                        listSerializer.serializeList(values, elementSerializer));
             } catch (IOException | RocksDBException e) {
                 throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
             }
@@ -255,7 +228,9 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
 
         try {
             while (serializedOldValueInput.available() > 0) {
-                V element = deserializeNextElement(serializedOldValueInput, priorElementSerializer);
+                V element =
+                        ListDelimitedSerializer.deserializeNextElement(
+                                serializedOldValueInput, priorElementSerializer);
                 newElementSerializer.serialize(element, serializedMigratedValueOutput);
                 if (serializedOldValueInput.available() > 0) {
                     serializedMigratedValueOutput.write(DELIMITER);
@@ -285,17 +260,19 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
     static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
         private final StateSnapshotTransformer<T> elementTransformer;
         private final TypeSerializer<T> elementSerializer;
-        private final DataOutputSerializer out = new DataOutputSerializer(128);
         private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
+        private final ListDelimitedSerializer listSerializer;
+        private final DataInputDeserializer in = new DataInputDeserializer();
 
         StateSnapshotTransformerWrapper(
                 StateSnapshotTransformer<T> elementTransformer,
                 TypeSerializer<T> elementSerializer) {
             this.elementTransformer = elementTransformer;
             this.elementSerializer = elementSerializer;
+            this.listSerializer = new ListDelimitedSerializer();
             this.transformStrategy =
                     elementTransformer instanceof CollectionStateSnapshotTransformer
-                            ? ((CollectionStateSnapshotTransformer) elementTransformer)
+                            ? ((CollectionStateSnapshotTransformer<?>) elementTransformer)
                                     .getFilterStrategy()
                             : CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL;
         }
@@ -307,10 +284,11 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
                 return null;
             }
             List<T> result = new ArrayList<>();
-            DataInputDeserializer in = new DataInputDeserializer(value);
+            in.setBuffer(value);
             T next;
             int prevPosition = 0;
-            while ((next = deserializeNextElement(in, elementSerializer)) != null) {
+            while ((next = ListDelimitedSerializer.deserializeNextElement(in, elementSerializer))
+                    != null) {
                 T transformedElement = elementTransformer.filterOrTransform(next);
                 if (transformedElement != null) {
                     if (transformStrategy == STOP_ON_FIRST_INCLUDED) {
@@ -324,33 +302,10 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
             try {
                 return result.isEmpty()
                         ? null
-                        : serializeValueList(result, elementSerializer, DELIMITER);
+                        : listSerializer.serializeList(result, elementSerializer);
             } catch (IOException e) {
                 throw new FlinkRuntimeException("Failed to serialize transformed list", e);
             }
         }
-
-        byte[] serializeValueList(
-                List<T> valueList,
-                TypeSerializer<T> elementSerializer,
-                @SuppressWarnings("SameParameterValue") byte delimiter)
-                throws IOException {
-
-            out.clear();
-            boolean first = true;
-
-            for (T value : valueList) {
-                Preconditions.checkNotNull(value, "You cannot add null to a value list.");
-
-                if (first) {
-                    first = false;
-                } else {
-                    out.write(delimiter);
-                }
-                elementSerializer.serialize(value, out);
-            }
-
-            return out.getCopyOfBuffer();
-        }
     }
 }