You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/09 14:47:41 UTC
[07/10] flink git commit: [FLINK-3779] [runtime] Add KvState network
client and server
http://git-wip-us.apache.org/repos/asf/flink/blob/af07eed8/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
new file mode 100644
index 0000000..a68c84b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.KvStateID;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class KvStateRequestSerializerTest {
+
+ private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+ /**
+ * Tests KvState request serialization.
+ */
+ @Test
+ public void testKvStateRequestSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1337L;
+ KvStateID kvStateId = new KvStateID();
+ byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
+ alloc,
+ requestId,
+ kvStateId,
+ serializedKeyAndNamespace);
+
+ int frameLength = buf.readInt();
+ assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf));
+ KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+ assertEquals(kvStateId, request.getKvStateId());
+ assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests KvState request serialization with zero-length serialized key and namespace.
+ */
+ @Test
+ public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
+ byte[] serializedKeyAndNamespace = new byte[0];
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
+ alloc,
+ 1823,
+ new KvStateID(),
+ serializedKeyAndNamespace);
+
+ int frameLength = buf.readInt();
+ assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf));
+ KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> key and namespace.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
+ new KvStateRequest(0, new KvStateID(), null);
+ }
+
+ /**
+ * Tests KvState request result serialization.
+ */
+ @Test
+ public void testKvStateRequestResultSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 72727278L;
+ byte[] serializedResult = randomByteArray(1024);
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult(
+ alloc,
+ requestId,
+ serializedResult);
+
+ int frameLength = buf.readInt();
+ assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf));
+ KvStateRequestResult request = KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+
+ assertArrayEquals(serializedResult, request.getSerializedResult());
+ }
+
+ /**
+ * Tests KvState request result serialization with zero-length serialized result.
+ */
+ @Test
+ public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception {
+ byte[] serializedResult = new byte[0];
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult(
+ alloc,
+ 72727278,
+ serializedResult);
+
+ int frameLength = buf.readInt();
+
+ assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf));
+ KvStateRequestResult request = KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedResult, request.getSerializedResult());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> results.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+ new KvStateRequestResult(0, null);
+ }
+
+ /**
+ * Tests KvState request failure serialization.
+ */
+ @Test
+ public void testKvStateRequestFailureSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1111222L;
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestFailure(
+ alloc,
+ requestId,
+ cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
+ KvStateRequestFailure request = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+ assertEquals(cause.getClass(), request.getCause().getClass());
+ assertEquals(cause.getMessage(), request.getCause().getMessage());
+ }
+
+ /**
+ * Tests KvState server failure serialization.
+ */
+ @Test
+ public void testServerFailureSerialization() throws Exception {
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = KvStateRequestSerializer.serializeServerFailure(alloc, cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
+ Throwable request = KvStateRequestSerializer.deserializeServerFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(cause.getClass(), request.getClass());
+ assertEquals(cause.getMessage(), request.getMessage());
+ }
+
+ /**
+ * Tests key and namespace serialization utils.
+ */
+ @Test
+ public void testKeyAndNamespaceSerialization() throws Exception {
+ TypeSerializer<Long> keySerializer = LongSerializer.INSTANCE;
+ TypeSerializer<String> namespaceSerializer = StringSerializer.INSTANCE;
+
+ long expectedKey = Integer.MAX_VALUE + 12323L;
+ String expectedNamespace = "knilf";
+
+ byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+ expectedKey, keySerializer, expectedNamespace, namespaceSerializer);
+
+ Tuple2<Long, String> actual = KvStateRequestSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, keySerializer, namespaceSerializer);
+
+ assertEquals(expectedKey, actual.f0.longValue());
+ assertEquals(expectedNamespace, actual.f1);
+ }
+
+ /**
+ * Tests value serialization utils.
+ */
+ @Test
+ public void testValueSerialization() throws Exception {
+ TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+ long expectedValue = Long.MAX_VALUE - 1292929292L;
+
+ byte[] serializedValue = KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer);
+ long actualValue = KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+
+ assertEquals(expectedValue, actualValue);
+ }
+
+ /**
+ * Tests list serialization utils.
+ */
+ @Test
+ public void testListSerialization() throws Exception {
+ TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+
+ // List
+ int numElements = 10;
+
+ List<Long> expectedValues = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ expectedValues.add(ThreadLocalRandom.current().nextLong());
+ }
+
+ byte[] serializedValues = KvStateRequestSerializer.serializeList(expectedValues, valueSerializer);
+ List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
+ assertEquals(expectedValues, actualValues);
+
+ // Single value
+ long expectedValue = ThreadLocalRandom.current().nextLong();
+ byte[] serializedValue = KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer);
+ List<Long> actualValue = KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
+ assertEquals(1, actualValue.size());
+ assertEquals(expectedValue, actualValue.get(0).longValue());
+ }
+
+ private byte[] randomByteArray(int capacity) {
+ byte[] bytes = new byte[capacity];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ return bytes;
+ }
+}