You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/19 16:20:51 UTC

[3/4] flink git commit: [FLINK-5521] [runtime] remove unused KvStateRequestSerializer#serializeList

[FLINK-5521] [runtime] remove unused KvStateRequestSerializer#serializeList

Also make sure that the serialization via the state backends' list states
matches the deserialization of the KvStateRequestSerializer#deserializeList
method.
So far, it was used this way but not made sure via tests.

This closes #3135


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8fd04a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8fd04a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8fd04a7

Branch: refs/heads/master
Commit: d8fd04a7e72ecee4fe5b06f42ee9530aa292e644
Parents: ac815d7
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 17 11:23:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 19 17:18:49 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java |  35 ------
 .../message/KvStateRequestSerializerTest.java   |  62 +++++++++-
 .../KVStateRequestSerializerRocksDBTest.java    | 118 +++++++++++++++++++
 3 files changed, 176 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 0ae60f6..6c8b4a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -36,7 +36,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -435,40 +434,6 @@ public final class KvStateRequestSerializer {
 	}
 
 	/**
-	 * Serializes all values of the Iterable with the given serializer.
-	 *
-	 * @param values     Values of type T to serialize
-	 * @param serializer Serializer for T
-	 * @param <T>        Type of the values
-	 * @return Serialized values or <code>null</code> if values <code>null</code> or empty
-	 * @throws IOException On failure during serialization
-	 */
-	public static <T> byte[] serializeList(Iterable<T> values, TypeSerializer<T> serializer) throws IOException {
-		if (values != null) {
-			Iterator<T> it = values.iterator();
-
-			if (it.hasNext()) {
-				// Serialize
-				DataOutputSerializer dos = new DataOutputSerializer(32);
-
-				while (it.hasNext()) {
-					serializer.serialize(it.next(), dos);
-
-					// This byte added here in order to have the binary format
-					// prescribed by RocksDB.
-					dos.write(0);
-				}
-
-				return dos.getCopyOfBuffer();
-			} else {
-				return null;
-			}
-		} else {
-			return null;
-		}
-	}
-
-	/**
 	 * Deserializes all values with the given serializer.
 	 *
 	 * @param serializedValue Serialized value of type List<T>

http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a68c84b..a9aa416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,11 +21,19 @@ package org.apache.flink.runtime.query.netty.message;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -34,6 +42,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 public class KvStateRequestSerializerTest {
 
@@ -228,17 +237,62 @@ public class KvStateRequestSerializerTest {
 	 */
 	@Test
 	public void testListSerialization() throws Exception {
+		final long key = 0l;
+
+		// objects for heap state list serialisation
+		final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
+			new HeapKeyedStateBackend<>(
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				ClassLoader.getSystemClassLoader(),
+				1, new KeyGroupRange(0, 0)
+			);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final ListState<Long> listState = longHeapKeyedStateBackend
+			.createListState(VoidNamespaceSerializer.INSTANCE,
+				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+		testListSerialization(key, listState);
+	}
+
+	/**
+	 * Verifies that the serialization of a list using the given list state
+	 * matches the deserialization with {@link KvStateRequestSerializer#deserializeList}.
+	 *
+	 * @param key
+	 * 		key of the list state
+	 * @param listState
+	 * 		list state using the {@link VoidNamespace}, must also be a {@link
+	 * 		KvState} instance
+	 *
+	 * @throws Exception
+	 */
+	public static void testListSerialization(final long key,
+		final ListState<Long> listState) throws Exception {
+
 		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
 
+		final KvState<VoidNamespace> listKvState =
+			(KvState<VoidNamespace>) listState;
+		listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
 		// List
-		int numElements = 10;
+		final int numElements = 10;
 
-		List<Long> expectedValues = new ArrayList<>();
+		final List<Long> expectedValues = new ArrayList<>();
 		for (int i = 0; i < numElements; i++) {
-			expectedValues.add(ThreadLocalRandom.current().nextLong());
+			final long value = ThreadLocalRandom.current().nextLong();
+			expectedValues.add(value);
+			listState.add(value);
 		}
 
-		byte[] serializedValues = KvStateRequestSerializer.serializeList(expectedValues, valueSerializer);
+		final byte[] serializedKey =
+			KvStateRequestSerializer.serializeKeyAndNamespace(
+				key, LongSerializer.INSTANCE,
+				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
+		final byte[] serializedValues =
+			listKvState.getSerializedValue(serializedKey);
+
 		List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
 		assertEquals(expectedValues, actualValues);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8fd04a7/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..0e1aca0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization of {@link
+ * KvStateRequestSerializer} with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
+	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+	 * the tests.
+	 *
+	 * @param <K> key type
+	 */
+	final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+		RocksDBKeyedStateBackend2(final JobID jobId,
+			final String operatorIdentifier,
+			final ClassLoader userCodeClassLoader,
+			final File instanceBasePath, final DBOptions dbOptions,
+			final ColumnFamilyOptions columnFamilyOptions,
+			final TaskKvStateRegistry kvStateRegistry,
+			final TypeSerializer<K> keySerializer, final int numberOfKeyGroups,
+			final KeyGroupRange keyGroupRange) throws Exception {
+			super(jobId, operatorIdentifier, userCodeClassLoader,
+				instanceBasePath,
+				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+				numberOfKeyGroups, keyGroupRange);
+		}
+
+		@Override
+		public <N, T> ListState<T> createListState(
+			final TypeSerializer<N> namespaceSerializer,
+			final ListStateDescriptor<T> stateDesc) throws Exception {
+			return super.createListState(namespaceSerializer, stateDesc);
+		}
+	}
+
+	/**
+	 * Tests list serialization and deserialization match.
+	 *
+	 * @see KvStateRequestSerializerTest#testListSerialization()
+	 * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+	 * test
+	 */
+	@Test
+	public void testListSerialization() throws Exception {
+		final long key = 0l;
+		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+
+		// objects for RocksDB state list serialisation
+		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+		dbOptions.setCreateIfMissing(true);
+		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend2<>(
+				new JobID(), "no-op",
+				ClassLoader.getSystemClassLoader(),
+				temporaryFolder.getRoot(),
+				dbOptions,
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				1, new KeyGroupRange(0, 0)
+			);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final ListState<Long> listState = longHeapKeyedStateBackend
+			.createListState(VoidNamespaceSerializer.INSTANCE,
+				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+		KvStateRequestSerializerTest.testListSerialization(key, listState);
+	}
+}