You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/19 16:20:51 UTC
[3/4] flink git commit: [FLINK-5521] [runtime] remove unused
KvStateRequestSerializer#serializeList
[FLINK-5521] [runtime] remove unused KvStateRequestSerializer#serializeList
Also make sure that the serialization via the state backends' list states
matches the deserialization of the KvStateRequestSerializer#deserializeList
method.
So far, it was used this way but not made sure via tests.
This closes #3135
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8fd04a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8fd04a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8fd04a7
Branch: refs/heads/master
Commit: d8fd04a7e72ecee4fe5b06f42ee9530aa292e644
Parents: ac815d7
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 17 11:23:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 19 17:18:49 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 35 ------
.../message/KvStateRequestSerializerTest.java | 62 +++++++++-
.../KVStateRequestSerializerRocksDBTest.java | 118 +++++++++++++++++++
3 files changed, 176 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 0ae60f6..6c8b4a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -36,7 +36,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
/**
@@ -435,40 +434,6 @@ public final class KvStateRequestSerializer {
}
/**
- * Serializes all values of the Iterable with the given serializer.
- *
- * @param values Values of type T to serialize
- * @param serializer Serializer for T
- * @param <T> Type of the values
- * @return Serialized values or <code>null</code> if values <code>null</code> or empty
- * @throws IOException On failure during serialization
- */
- public static <T> byte[] serializeList(Iterable<T> values, TypeSerializer<T> serializer) throws IOException {
- if (values != null) {
- Iterator<T> it = values.iterator();
-
- if (it.hasNext()) {
- // Serialize
- DataOutputSerializer dos = new DataOutputSerializer(32);
-
- while (it.hasNext()) {
- serializer.serialize(it.next(), dos);
-
- // This byte added here in order to have the binary format
- // prescribed by RocksDB.
- dos.write(0);
- }
-
- return dos.getCopyOfBuffer();
- } else {
- return null;
- }
- } else {
- return null;
- }
- }
-
- /**
* Deserializes all values with the given serializer.
*
* @param serializedValue Serialized value of type List<T>
http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a68c84b..a9aa416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,11 +21,19 @@ package org.apache.flink.runtime.query.netty.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.junit.Test;
import java.util.ArrayList;
@@ -34,6 +42,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
public class KvStateRequestSerializerTest {
@@ -228,17 +237,62 @@ public class KvStateRequestSerializerTest {
*/
@Test
public void testListSerialization() throws Exception {
+ final long key = 0l;
+
+ // objects for heap state list serialisation
+ final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ ClassLoader.getSystemClassLoader(),
+ 1, new KeyGroupRange(0, 0)
+ );
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final ListState<Long> listState = longHeapKeyedStateBackend
+ .createListState(VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+ testListSerialization(key, listState);
+ }
+
+ /**
+ * Verifies that the serialization of a list using the given list state
+ * matches the deserialization with {@link KvStateRequestSerializer#deserializeList}.
+ *
+ * @param key
+ * key of the list state
+ * @param listState
+ * list state using the {@link VoidNamespace}, must also be a {@link
+ * KvState} instance
+ *
+ * @throws Exception
+ */
+ public static void testListSerialization(final long key,
+ final ListState<Long> listState) throws Exception {
+
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+ final KvState<VoidNamespace> listKvState =
+ (KvState<VoidNamespace>) listState;
+ listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
// List
- int numElements = 10;
+ final int numElements = 10;
- List<Long> expectedValues = new ArrayList<>();
+ final List<Long> expectedValues = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
- expectedValues.add(ThreadLocalRandom.current().nextLong());
+ final long value = ThreadLocalRandom.current().nextLong();
+ expectedValues.add(value);
+ listState.add(value);
}
- byte[] serializedValues = KvStateRequestSerializer.serializeList(expectedValues, valueSerializer);
+ final byte[] serializedKey =
+ KvStateRequestSerializer.serializeKeyAndNamespace(
+ key, LongSerializer.INSTANCE,
+ VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
+ final byte[] serializedValues =
+ listKvState.getSerializedValue(serializedKey);
+
List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
assertEquals(expectedValues, actualValues);
http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..0e1aca0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization of {@link
+ * KvStateRequestSerializer} with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ /**
+ * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+ * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+ * the tests.
+ *
+ * @param <K> key type
+ */
+ final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+ RocksDBKeyedStateBackend2(final JobID jobId,
+ final String operatorIdentifier,
+ final ClassLoader userCodeClassLoader,
+ final File instanceBasePath, final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions,
+ final TaskKvStateRegistry kvStateRegistry,
+ final TypeSerializer<K> keySerializer, final int numberOfKeyGroups,
+ final KeyGroupRange keyGroupRange) throws Exception {
+ super(jobId, operatorIdentifier, userCodeClassLoader,
+ instanceBasePath,
+ dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+ numberOfKeyGroups, keyGroupRange);
+ }
+
+ @Override
+ public <N, T> ListState<T> createListState(
+ final TypeSerializer<N> namespaceSerializer,
+ final ListStateDescriptor<T> stateDesc) throws Exception {
+ return super.createListState(namespaceSerializer, stateDesc);
+ }
+ }
+
+ /**
+ * Tests list serialization and deserialization match.
+ *
+ * @see KvStateRequestSerializerTest#testListSerialization()
+ * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+ * test
+ */
+ @Test
+ public void testListSerialization() throws Exception {
+ final long key = 0l;
+ TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+
+ // objects for RocksDB state list serialisation
+ DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+ final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend2<>(
+ new JobID(), "no-op",
+ ClassLoader.getSystemClassLoader(),
+ temporaryFolder.getRoot(),
+ dbOptions,
+ columnFamilyOptions,
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ 1, new KeyGroupRange(0, 0)
+ );
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final ListState<Long> listState = longHeapKeyedStateBackend
+ .createListState(VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+ KvStateRequestSerializerTest.testListSerialization(key, listState);
+ }
+}