You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:17 UTC
[flink] 01/09: [refactor] Extract common logic for serializing
delimited list
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aae75bfe3cc16c56aa3a35b21b3698cbf7d7d209
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jan 28 14:04:06 2021 +0100
[refactor] Extract common logic for serializing delimited list
---
.../runtime/state/ListDelimitedSerializer.java | 93 ++++++++++++++++++++++
.../streaming/state/AbstractRocksDBState.java | 22 -----
.../contrib/streaming/state/RocksDBListState.java | 81 +++++--------------
3 files changed, 111 insertions(+), 85 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java
new file mode 100644
index 0000000..ddfb916
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ListDelimitedSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates a logic of serialization and deserialization of a list with a delimiter. Used in the
+ * savepoint format.
+ */
+public final class ListDelimitedSerializer {
+
+ private static final byte DELIMITER = ',';
+
+ private final DataInputDeserializer dataInputView = new DataInputDeserializer();
+ private final DataOutputSerializer dataOutputView = new DataOutputSerializer(128);
+
+ public <T> List<T> deserializeList(byte[] valueBytes, TypeSerializer<T> elementSerializer) {
+ if (valueBytes == null) {
+ return null;
+ }
+
+ dataInputView.setBuffer(valueBytes);
+
+ List<T> result = new ArrayList<>();
+ T next;
+ while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
+ result.add(next);
+ }
+ return result;
+ }
+
+ public <T> byte[] serializeList(List<T> valueList, TypeSerializer<T> elementSerializer)
+ throws IOException {
+
+ dataOutputView.clear();
+ boolean first = true;
+
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ dataOutputView.write(DELIMITER);
+ }
+ elementSerializer.serialize(value, dataOutputView);
+ }
+
+ return dataOutputView.getCopyOfBuffer();
+ }
+
+ /** Deserializes a single element from a serialized list. */
+ public static <T> T deserializeNextElement(
+ DataInputDeserializer in, TypeSerializer<T> elementSerializer) {
+ try {
+ if (in.available() > 0) {
+ T element = elementSerializer.deserialize(in);
+ if (in.available() > 0) {
+ in.readByte();
+ }
+ return element;
+ }
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Unexpected list element deserialization failure", e);
+ }
+ return null;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 621e93a..c9a07cf 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,7 +34,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import java.io.IOException;
-import java.util.List;
/**
* Base class for {@link State} implementations that store state in a RocksDB database.
@@ -180,27 +179,6 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
return serializeValueInternal(value, serializer);
}
- <T> byte[] serializeValueList(
- List<T> valueList, TypeSerializer<T> elementSerializer, byte delimiter)
- throws IOException {
-
- dataOutputView.clear();
- boolean first = true;
-
- for (T value : valueList) {
- Preconditions.checkNotNull(value, "You cannot add null to a value list.");
-
- if (first) {
- first = false;
- } else {
- dataOutputView.write(delimiter);
- }
- elementSerializer.serialize(value, dataOutputView);
- }
-
- return dataOutputView.getCopyOfBuffer();
- }
-
public void migrateSerializedValue(
DataInputDeserializer serializedOldValueInput,
DataOutputSerializer serializedMigratedValueOutput,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 7e8353a..8bcecc7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -63,6 +64,8 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
/** Serializer for the values. */
private final TypeSerializer<V> elementSerializer;
+ private final ListDelimitedSerializer listSerializer;
+
/** Separator of StringAppendTestOperator in RocksDB. */
private static final byte DELIMITER = ',';
@@ -86,6 +89,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
ListSerializer<V> castedListSerializer = (ListSerializer<V>) valueSerializer;
this.elementSerializer = castedListSerializer.getElementSerializer();
+ this.listSerializer = new ListDelimitedSerializer();
}
@Override
@@ -113,43 +117,12 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
try {
byte[] key = serializeCurrentKeyWithGroupAndNamespace();
byte[] valueBytes = backend.db.get(columnFamily, key);
- return deserializeList(valueBytes);
+ return listSerializer.deserializeList(valueBytes, elementSerializer);
} catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
}
}
- private List<V> deserializeList(byte[] valueBytes) {
- if (valueBytes == null) {
- return null;
- }
-
- dataInputView.setBuffer(valueBytes);
-
- List<V> result = new ArrayList<>();
- V next;
- while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) {
- result.add(next);
- }
- return result;
- }
-
- private static <V> V deserializeNextElement(
- DataInputDeserializer in, TypeSerializer<V> elementSerializer) {
- try {
- if (in.available() > 0) {
- V element = elementSerializer.deserialize(in);
- if (in.available() > 0) {
- in.readByte();
- }
- return element;
- }
- } catch (IOException e) {
- throw new FlinkRuntimeException("Unexpected list element deserialization failure", e);
- }
- return null;
- }
-
@Override
public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
@@ -210,7 +183,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
columnFamily,
writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
- serializeValueList(values, elementSerializer, DELIMITER));
+ listSerializer.serializeList(values, elementSerializer));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -229,7 +202,7 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
columnFamily,
writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
- serializeValueList(values, elementSerializer, DELIMITER));
+ listSerializer.serializeList(values, elementSerializer));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while updating data to RocksDB", e);
}
@@ -255,7 +228,9 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
try {
while (serializedOldValueInput.available() > 0) {
- V element = deserializeNextElement(serializedOldValueInput, priorElementSerializer);
+ V element =
+ ListDelimitedSerializer.deserializeNextElement(
+ serializedOldValueInput, priorElementSerializer);
newElementSerializer.serialize(element, serializedMigratedValueOutput);
if (serializedOldValueInput.available() > 0) {
serializedMigratedValueOutput.write(DELIMITER);
@@ -285,17 +260,19 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> {
private final StateSnapshotTransformer<T> elementTransformer;
private final TypeSerializer<T> elementSerializer;
- private final DataOutputSerializer out = new DataOutputSerializer(128);
private final CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
+ private final ListDelimitedSerializer listSerializer;
+ private final DataInputDeserializer in = new DataInputDeserializer();
StateSnapshotTransformerWrapper(
StateSnapshotTransformer<T> elementTransformer,
TypeSerializer<T> elementSerializer) {
this.elementTransformer = elementTransformer;
this.elementSerializer = elementSerializer;
+ this.listSerializer = new ListDelimitedSerializer();
this.transformStrategy =
elementTransformer instanceof CollectionStateSnapshotTransformer
- ? ((CollectionStateSnapshotTransformer) elementTransformer)
+ ? ((CollectionStateSnapshotTransformer<?>) elementTransformer)
.getFilterStrategy()
: CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL;
}
@@ -307,10 +284,11 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
return null;
}
List<T> result = new ArrayList<>();
- DataInputDeserializer in = new DataInputDeserializer(value);
+ in.setBuffer(value);
T next;
int prevPosition = 0;
- while ((next = deserializeNextElement(in, elementSerializer)) != null) {
+ while ((next = ListDelimitedSerializer.deserializeNextElement(in, elementSerializer))
+ != null) {
T transformedElement = elementTransformer.filterOrTransform(next);
if (transformedElement != null) {
if (transformStrategy == STOP_ON_FIRST_INCLUDED) {
@@ -324,33 +302,10 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>>
try {
return result.isEmpty()
? null
- : serializeValueList(result, elementSerializer, DELIMITER);
+ : listSerializer.serializeList(result, elementSerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to serialize transformed list", e);
}
}
-
- byte[] serializeValueList(
- List<T> valueList,
- TypeSerializer<T> elementSerializer,
- @SuppressWarnings("SameParameterValue") byte delimiter)
- throws IOException {
-
- out.clear();
- boolean first = true;
-
- for (T value : valueList) {
- Preconditions.checkNotNull(value, "You cannot add null to a value list.");
-
- if (first) {
- first = false;
- } else {
- out.write(delimiter);
- }
- elementSerializer.serialize(value, out);
- }
-
- return out.getCopyOfBuffer();
- }
}
}