You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:10 UTC
[10/14] flink git commit: [FLINK-7770][QS] Hide the queryable state
behind a proxy.
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
deleted file mode 100644
index a2850b3..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.client.KvStateClient;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link KvStateClient}.
- */
-public class KvStateClientTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
-
- // Thread pool for client bootstrap (shared between tests)
- private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
-
- private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
-
- @AfterClass
- public static void tearDown() throws Exception {
- if (NIO_GROUP != null) {
- NIO_GROUP.shutdownGracefully();
- }
- }
-
- /**
- * Tests simple queries, of which half succeed and half fail.
- */
- @Test
- public void testSimpleRequests() throws Exception {
- Deadline deadline = TEST_TIMEOUT.fromNow();
- AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
- KvStateClient client = null;
- Channel serverChannel = null;
-
- try {
- client = new KvStateClient(1, stats);
-
- // Random result
- final byte[] expected = new byte[1024];
- ThreadLocalRandom.current().nextBytes(expected);
-
- final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
- final AtomicReference<Channel> channel = new AtomicReference<>();
-
- serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- channel.set(ctx.channel());
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- received.add((ByteBuf) msg);
- }
- });
-
- KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
- List<Future<byte[]>> futures = new ArrayList<>();
-
- int numQueries = 1024;
-
- for (int i = 0; i < numQueries; i++) {
- futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
- }
-
- // Respond to messages
- Exception testException = new RuntimeException("Expected test Exception");
-
- for (int i = 0; i < numQueries; i++) {
- ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- assertNotNull("Receive timed out", buf);
-
- Channel ch = channel.get();
- assertNotNull("Channel not active", ch);
-
- assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
- KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-
- buf.release();
-
- if (i % 2 == 0) {
- ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
- serverChannel.alloc(),
- request.getRequestId(),
- expected);
-
- ch.writeAndFlush(response);
- } else {
- ByteBuf response = MessageSerializer.serializeKvStateRequestFailure(
- serverChannel.alloc(),
- request.getRequestId(),
- testException);
-
- ch.writeAndFlush(response);
- }
- }
-
- for (int i = 0; i < numQueries; i++) {
- if (i % 2 == 0) {
- byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft());
- assertArrayEquals(expected, serializedResult);
- } else {
- try {
- Await.result(futures.get(i), deadline.timeLeft());
- fail("Did not throw expected Exception");
- } catch (RuntimeException ignored) {
- // Expected
- }
- }
- }
-
- assertEquals(numQueries, stats.getNumRequests());
- int expectedRequests = numQueries / 2;
-
- // Counts can take some time to propagate
- while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
- stats.getNumFailed() != expectedRequests)) {
- Thread.sleep(100);
- }
-
- assertEquals(expectedRequests, stats.getNumSuccessful());
- assertEquals(expectedRequests, stats.getNumFailed());
- } finally {
- if (client != null) {
- client.shutDown();
- }
-
- if (serverChannel != null) {
- serverChannel.close();
- }
-
- assertEquals("Channel leak", 0, stats.getNumConnections());
- }
- }
-
- /**
- * Tests that a request to an unavailable host is failed with ConnectException.
- */
- @Test
- public void testRequestUnavailableHost() throws Exception {
- Deadline deadline = TEST_TIMEOUT.fromNow();
- AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateClient client = null;
-
- try {
- client = new KvStateClient(1, stats);
-
- int availablePort = NetUtils.getAvailablePort();
-
- KvStateServerAddress serverAddress = new KvStateServerAddress(
- InetAddress.getLocalHost(),
- availablePort);
-
- Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
- try {
- Await.result(future, deadline.timeLeft());
- fail("Did not throw expected ConnectException");
- } catch (ConnectException ignored) {
- // Expected
- }
- } finally {
- if (client != null) {
- client.shutDown();
- }
-
- assertEquals("Channel leak", 0, stats.getNumConnections());
- }
- }
-
- /**
- * Multiple threads concurrently fire queries.
- */
- @Test
- public void testConcurrentQueries() throws Exception {
- Deadline deadline = TEST_TIMEOUT.fromNow();
- AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
- ExecutorService executor = null;
- KvStateClient client = null;
- Channel serverChannel = null;
-
- final byte[] serializedResult = new byte[1024];
- ThreadLocalRandom.current().nextBytes(serializedResult);
-
- try {
- int numQueryTasks = 4;
- final int numQueriesPerTask = 1024;
-
- executor = Executors.newFixedThreadPool(numQueryTasks);
-
- client = new KvStateClient(1, stats);
-
- serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
- KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-
- buf.release();
-
- ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
- ctx.alloc(),
- request.getRequestId(),
- serializedResult);
-
- ctx.channel().writeAndFlush(response);
- }
- });
-
- final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
- final KvStateClient finalClient = client;
- Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() {
- @Override
- public List<Future<byte[]>> call() throws Exception {
- List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask);
-
- for (int i = 0; i < numQueriesPerTask; i++) {
- results.add(finalClient.getKvState(
- serverAddress,
- new KvStateID(),
- new byte[0]));
- }
-
- return results;
- }
- };
-
- // Submit query tasks
- List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>();
- for (int i = 0; i < numQueryTasks; i++) {
- futures.add(executor.submit(queryTask));
- }
-
- // Verify results
- for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) {
- List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- for (Future<byte[]> result : results) {
- byte[] actual = Await.result(result, deadline.timeLeft());
- assertArrayEquals(serializedResult, actual);
- }
- }
-
- int totalQueries = numQueryTasks * numQueriesPerTask;
-
- // Counts can take some time to propagate
- while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
- Thread.sleep(100);
- }
-
- assertEquals(totalQueries, stats.getNumRequests());
- assertEquals(totalQueries, stats.getNumSuccessful());
- } finally {
- if (executor != null) {
- executor.shutdown();
- }
-
- if (serverChannel != null) {
- serverChannel.close();
- }
-
- if (client != null) {
- client.shutDown();
- }
-
- assertEquals("Channel leak", 0, stats.getNumConnections());
- }
- }
-
- /**
- * Tests that a server failure closes the connection and removes it from
- * the established connections.
- */
- @Test
- public void testFailureClosesChannel() throws Exception {
- Deadline deadline = TEST_TIMEOUT.fromNow();
- AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
- KvStateClient client = null;
- Channel serverChannel = null;
-
- try {
- client = new KvStateClient(1, stats);
-
- final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
- final AtomicReference<Channel> channel = new AtomicReference<>();
-
- serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- channel.set(ctx.channel());
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- received.add((ByteBuf) msg);
- }
- });
-
- KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
- // Requests
- List<Future<byte[]>> futures = new ArrayList<>();
- futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
- futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
-
- ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- assertNotNull("Receive timed out", buf);
- buf.release();
-
- buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- assertNotNull("Receive timed out", buf);
- buf.release();
-
- assertEquals(1, stats.getNumConnections());
-
- Channel ch = channel.get();
- assertNotNull("Channel not active", ch);
-
- // Respond with failure
- ch.writeAndFlush(MessageSerializer.serializeServerFailure(
- serverChannel.alloc(),
- new RuntimeException("Expected test server failure")));
-
- try {
- Await.result(futures.remove(0), deadline.timeLeft());
- fail("Did not throw expected server failure");
- } catch (RuntimeException ignored) {
- // Expected
- }
-
- try {
- Await.result(futures.remove(0), deadline.timeLeft());
- fail("Did not throw expected server failure");
- } catch (RuntimeException ignored) {
- // Expected
- }
-
- assertEquals(0, stats.getNumConnections());
-
- // Counts can take some time to propagate
- while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
- stats.getNumFailed() != 2)) {
- Thread.sleep(100);
- }
-
- assertEquals(2, stats.getNumRequests());
- assertEquals(0, stats.getNumSuccessful());
- assertEquals(2, stats.getNumFailed());
- } finally {
- if (client != null) {
- client.shutDown();
- }
-
- if (serverChannel != null) {
- serverChannel.close();
- }
-
- assertEquals("Channel leak", 0, stats.getNumConnections());
- }
- }
-
- /**
- * Tests that a server channel close, closes the connection and removes it
- * from the established connections.
- */
- @Test
- public void testServerClosesChannel() throws Exception {
- Deadline deadline = TEST_TIMEOUT.fromNow();
- AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
- KvStateClient client = null;
- Channel serverChannel = null;
-
- try {
- client = new KvStateClient(1, stats);
-
- final AtomicBoolean received = new AtomicBoolean();
- final AtomicReference<Channel> channel = new AtomicReference<>();
-
- serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- channel.set(ctx.channel());
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- received.set(true);
- }
- });
-
- KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
- // Requests
- Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
- while (!received.get() && deadline.hasTimeLeft()) {
- Thread.sleep(50);
- }
- assertTrue("Receive timed out", received.get());
-
- assertEquals(1, stats.getNumConnections());
-
- channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- try {
- Await.result(future, deadline.timeLeft());
- fail("Did not throw expected server failure");
- } catch (ClosedChannelException ignored) {
- // Expected
- }
-
- assertEquals(0, stats.getNumConnections());
-
- // Counts can take some time to propagate
- while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
- stats.getNumFailed() != 1)) {
- Thread.sleep(100);
- }
-
- assertEquals(1, stats.getNumRequests());
- assertEquals(0, stats.getNumSuccessful());
- assertEquals(1, stats.getNumFailed());
- } finally {
- if (client != null) {
- client.shutDown();
- }
-
- if (serverChannel != null) {
- serverChannel.close();
- }
-
- assertEquals("Channel leak", 0, stats.getNumConnections());
- }
- }
-
- /**
- * Tests multiple clients querying multiple servers until 100k queries have
- * been processed. At this point, the client is shut down and its verified
- * that all ongoing requests are failed.
- */
- @Test
- public void testClientServerIntegration() throws Exception {
- // Config
- final int numServers = 2;
- final int numServerEventLoopThreads = 2;
- final int numServerQueryThreads = 2;
-
- final int numClientEventLoopThreads = 4;
- final int numClientsTasks = 8;
-
- final int batchSize = 16;
-
- final int numKeyGroups = 1;
-
- AbstractStateBackend abstractBackend = new MemoryStateBackend();
- KvStateRegistry dummyRegistry = new KvStateRegistry();
- DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
- dummyEnv.setKvStateRegistry(dummyRegistry);
-
- AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
- dummyEnv,
- new JobID(),
- "test_op",
- IntSerializer.INSTANCE,
- numKeyGroups,
- new KeyGroupRange(0, 0),
- dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
-
- final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
- AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
-
- KvStateClient client = null;
- ExecutorService clientTaskExecutor = null;
- final KvStateServer[] server = new KvStateServer[numServers];
-
- try {
- client = new KvStateClient(numClientEventLoopThreads, clientStats);
- clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
-
- // Create state
- ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
- desc.setQueryable("any");
-
- // Create servers
- KvStateRegistry[] registry = new KvStateRegistry[numServers];
- AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
- final KvStateID[] ids = new KvStateID[numServers];
-
- for (int i = 0; i < numServers; i++) {
- registry[i] = new KvStateRegistry();
- serverStats[i] = new AtomicKvStateRequestStats();
- server[i] = new KvStateServerImpl(
- InetAddress.getLocalHost(),
- 0,
- numServerEventLoopThreads,
- numServerQueryThreads,
- registry[i],
- serverStats[i]);
-
- server[i].start();
-
- backend.setCurrentKey(1010 + i);
-
- // Value per server
- ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- desc);
-
- state.update(201 + i);
-
- // we know it must be a KvStat but this is not exposed to the user via State
- InternalKvState<?> kvState = (InternalKvState<?>) state;
-
- // Register KvState (one state instance for all server)
- ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
- }
-
- final KvStateClient finalClient = client;
- Callable<Void> queryTask = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while (true) {
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
-
- // Random server permutation
- List<Integer> random = new ArrayList<>();
- for (int j = 0; j < batchSize; j++) {
- random.add(j);
- }
- Collections.shuffle(random);
-
- // Dispatch queries
- List<Future<byte[]>> futures = new ArrayList<>(batchSize);
-
- for (int j = 0; j < batchSize; j++) {
- int targetServer = random.get(j) % numServers;
-
- byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
- 1010 + targetServer,
- IntSerializer.INSTANCE,
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE);
-
- futures.add(finalClient.getKvState(
- server[targetServer].getAddress(),
- ids[targetServer],
- serializedKeyAndNamespace));
- }
-
- // Verify results
- for (int j = 0; j < batchSize; j++) {
- int targetServer = random.get(j) % numServers;
-
- Future<byte[]> future = futures.get(j);
- byte[] buf = Await.result(future, timeout);
- int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
- assertEquals(201 + targetServer, value);
- }
- }
- }
- };
-
- // Submit tasks
- List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>();
- for (int i = 0; i < numClientsTasks; i++) {
- taskFutures.add(clientTaskExecutor.submit(queryTask));
- }
-
- long numRequests;
- while ((numRequests = clientStats.getNumRequests()) < 100_000) {
- Thread.sleep(100);
- LOG.info("Number of requests {}/100_000", numRequests);
- }
-
- // Shut down
- client.shutDown();
-
- for (java.util.concurrent.Future<Void> future : taskFutures) {
- try {
- future.get();
- fail("Did not throw expected Exception after shut down");
- } catch (ExecutionException t) {
- if (t.getCause() instanceof ClosedChannelException ||
- t.getCause() instanceof IllegalStateException) {
- // Expected
- } else {
- t.printStackTrace();
- fail("Failed with unexpected Exception type: " + t.getClass().getName());
- }
- }
- }
-
- assertEquals("Connection leak (client)", 0, clientStats.getNumConnections());
- for (int i = 0; i < numServers; i++) {
- boolean success = false;
- int numRetries = 0;
- while (!success) {
- try {
- assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections());
- success = true;
- } catch (Throwable t) {
- if (numRetries < 10) {
- LOG.info("Retrying connection leak check (server)");
- Thread.sleep((numRetries + 1) * 50);
- numRetries++;
- } else {
- throw t;
- }
- }
- }
- }
- } finally {
- if (client != null) {
- client.shutDown();
- }
-
- for (int i = 0; i < numServers; i++) {
- if (server[i] != null) {
- server[i].shutDown();
- }
- }
-
- if (clientTaskExecutor != null) {
- clientTaskExecutor.shutdown();
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
- ServerBootstrap bootstrap = new ServerBootstrap()
- // Bind address and port
- .localAddress(InetAddress.getLocalHost(), 0)
- // NIO server channels
- .group(NIO_GROUP)
- .channel(NioServerSocketChannel.class)
- // See initializer for pipeline details
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
- .addLast(handlers);
- }
- });
-
- return bootstrap.bind().sync().channel();
- }
-
- private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
- InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
-
- return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
deleted file mode 100644
index f28ca68..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link KvStateSerializer}.
- */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
-
- private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-
- @Parameterized.Parameters
- public static Collection<Boolean> parameters() {
- return Arrays.asList(false, true);
- }
-
- @Parameterized.Parameter
- public boolean async;
-
- /**
- * Tests KvState request serialization.
- */
- @Test
- public void testKvStateRequestSerialization() throws Exception {
- long requestId = Integer.MAX_VALUE + 1337L;
- KvStateID kvStateId = new KvStateID();
- byte[] serializedKeyAndNamespace = randomByteArray(1024);
-
- ByteBuf buf = MessageSerializer.serializeKvStateRequest(
- alloc,
- requestId,
- kvStateId,
- serializedKeyAndNamespace);
-
- int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
- KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertEquals(requestId, request.getRequestId());
- assertEquals(kvStateId, request.getKvStateId());
- assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
- }
-
- /**
- * Tests KvState request serialization with zero-length serialized key and namespace.
- */
- @Test
- public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
- byte[] serializedKeyAndNamespace = new byte[0];
-
- ByteBuf buf = MessageSerializer.serializeKvStateRequest(
- alloc,
- 1823,
- new KvStateID(),
- serializedKeyAndNamespace);
-
- int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
- KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
- }
-
- /**
- * Tests that we don't try to be smart about <code>null</code> key and namespace.
- * They should be treated explicitly.
- */
- @Test(expected = NullPointerException.class)
- public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
- new KvStateRequest(0, new KvStateID(), null);
- }
-
- /**
- * Tests KvState request result serialization.
- */
- @Test
- public void testKvStateRequestResultSerialization() throws Exception {
- long requestId = Integer.MAX_VALUE + 72727278L;
- byte[] serializedResult = randomByteArray(1024);
-
- ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
- alloc,
- requestId,
- serializedResult);
-
- int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
- KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertEquals(requestId, request.getRequestId());
-
- assertArrayEquals(serializedResult, request.getSerializedResult());
- }
-
- /**
- * Tests KvState request result serialization with zero-length serialized result.
- */
- @Test
- public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception {
- byte[] serializedResult = new byte[0];
-
- ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
- alloc,
- 72727278,
- serializedResult);
-
- int frameLength = buf.readInt();
-
- assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
- KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertArrayEquals(serializedResult, request.getSerializedResult());
- }
-
- /**
- * Tests that we don't try to be smart about <code>null</code> results.
- * They should be treated explicitly.
- */
- @Test(expected = NullPointerException.class)
- public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
- new KvStateRequestResult(0, null);
- }
-
- /**
- * Tests KvState request failure serialization.
- */
- @Test
- public void testKvStateRequestFailureSerialization() throws Exception {
- long requestId = Integer.MAX_VALUE + 1111222L;
- IllegalStateException cause = new IllegalStateException("Expected test");
-
- ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
- alloc,
- requestId,
- cause);
-
- int frameLength = buf.readInt();
- assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertEquals(requestId, request.getRequestId());
- assertEquals(cause.getClass(), request.getCause().getClass());
- assertEquals(cause.getMessage(), request.getCause().getMessage());
- }
-
- /**
- * Tests KvState server failure serialization.
- */
- @Test
- public void testServerFailureSerialization() throws Exception {
- IllegalStateException cause = new IllegalStateException("Expected test");
-
- ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
-
- int frameLength = buf.readInt();
- assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
- Throwable request = MessageSerializer.deserializeServerFailure(buf);
- assertEquals(buf.readerIndex(), frameLength + 4);
-
- assertEquals(cause.getClass(), request.getClass());
- assertEquals(cause.getMessage(), request.getMessage());
- }
-
- private byte[] randomByteArray(int capacity) {
- byte[] bytes = new byte[capacity];
- ThreadLocalRandom.current().nextBytes(bytes);
- return bytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index c37c822..944349ee 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -24,20 +24,22 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -57,10 +59,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -76,16 +79,28 @@ import static org.mockito.Mockito.when;
*/
public class KvStateServerHandlerTest extends TestLogger {
- /** Shared Thread pool for query execution. */
- private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
-
- private static final int READ_TIMEOUT_MILLIS = 10000;
+ private static KvStateServerImpl testServer;
+
+ private static final long READ_TIMEOUT_MILLIS = 10000L;
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ testServer = new KvStateServerImpl(
+ InetAddress.getLocalHost(),
+ 0,
+ 1,
+ 1,
+ new KvStateRegistry(),
+ new DisabledKvStateRequestStats());
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
@AfterClass
public static void tearDown() throws Exception {
- if (TEST_THREAD_POOL != null) {
- TEST_THREAD_POOL.shutdown();
- }
+ testServer.shutdown();
}
/**
@@ -96,7 +111,10 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
// Register state
@@ -141,40 +159,40 @@ public class KvStateServerHandlerTest extends TestLogger {
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- requestId,
- registryListener.kvStateId,
- serializedKeyAndNamespace);
+ KvStateInternalRequest request = new KvStateInternalRequest(
+ registryListener.kvStateId, serializedKeyAndNamespace);
+
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
- KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+ long deserRequestId = MessageSerializer.getRequestId(buf);
+ KvStateResponse response = serializer.deserializeResponse(buf);
- assertEquals(requestId, response.getRequestId());
+ assertEquals(requestId, deserRequestId);
- int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+ int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE);
assertEquals(expectedValue, actualValue);
assertEquals(stats.toString(), 1, stats.getNumRequests());
// Wait for async successful request report
long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
- while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
- Thread.sleep(10);
+ while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) {
+ Thread.sleep(10L);
}
- assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+ assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
}
/**
- * Tests the failure response with {@link UnknownKvStateID} as cause on
+ * Tests the failure response with {@link UnknownKvStateIdException} as cause on
* queries for unregistered KvStateIDs.
*/
@Test
@@ -182,36 +200,38 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
long requestId = Integer.MAX_VALUE + 182828L;
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- requestId,
- new KvStateID(),
- new byte[0]);
+
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
assertEquals(requestId, response.getRequestId());
- assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID);
+ assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateIdException);
- assertEquals(1, stats.getNumRequests());
- assertEquals(1, stats.getNumFailed());
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
}
/**
- * Tests the failure response with {@link UnknownKeyOrNamespace} as cause
+ * Tests the failure response with {@link UnknownKeyOrNamespaceException} as cause
* on queries for non-existing keys.
*/
@Test
@@ -219,7 +239,10 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
int numKeyGroups = 1;
@@ -254,40 +277,39 @@ public class KvStateServerHandlerTest extends TestLogger {
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- requestId,
- registryListener.kvStateId,
- serializedKeyAndNamespace);
+ KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
assertEquals(requestId, response.getRequestId());
- assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace);
+ assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespaceException);
- assertEquals(1, stats.getNumRequests());
- assertEquals(1, stats.getNumFailed());
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
}
/**
- * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
- * call.
+ * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call.
*/
@Test
public void testFailureOnGetSerializedValue() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
// Failing KvState
@@ -302,38 +324,37 @@ public class KvStateServerHandlerTest extends TestLogger {
"vanilla",
kvState);
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- 282872,
- kvStateId,
- new byte[0]);
+ KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
- assertEquals(1, stats.getNumRequests());
- assertEquals(1, stats.getNumFailed());
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
}
/**
- * Tests that the channel is closed if an Exception reaches the channel
- * handler.
+ * Tests that the channel is closed if an Exception reaches the channel handler.
*/
@Test
public void testCloseChannelOnExceptionCaught() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
@@ -352,19 +373,28 @@ public class KvStateServerHandlerTest extends TestLogger {
}
/**
- * Tests the failure response on a rejected execution, because the query
- * executor has been closed.
+ * Tests the failure response on a rejected execution, because the query executor has been closed.
*/
@Test
public void testQueryExecutorShutDown() throws Exception {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- ExecutorService closedExecutor = Executors.newSingleThreadExecutor();
- closedExecutor.shutdown();
- assertTrue(closedExecutor.isShutdown());
+ KvStateServerImpl localTestServer = new KvStateServerImpl(
+ InetAddress.getLocalHost(),
+ 0,
+ 1,
+ 1,
+ new KvStateRegistry(),
+ new DisabledKvStateRequestStats());
+
+ localTestServer.shutdown();
+ assertTrue(localTestServer.isExecutorShutdown());
- KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(localTestServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
int numKeyGroups = 1;
@@ -391,26 +421,25 @@ public class KvStateServerHandlerTest extends TestLogger {
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- 282872,
- registryListener.kvStateId,
- new byte[0]);
+ KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
- assertEquals(1, stats.getNumRequests());
- assertEquals(1, stats.getNumFailed());
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
+
+ localTestServer.shutdown();
}
/**
@@ -421,7 +450,10 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
// Write the request and wait for the response
@@ -438,13 +470,11 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
- assertEquals(0, stats.getNumRequests());
- assertEquals(0, stats.getNumFailed());
+ assertEquals(0L, stats.getNumRequests());
+ assertEquals(0L, stats.getNumFailed());
- unexpectedMessage = MessageSerializer.serializeKvStateRequestResult(
- channel.alloc(),
- 192,
- new byte[0]);
+ KvStateResponse stateResponse = new KvStateResponse(new byte[0]);
+ unexpectedMessage = MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse);
channel.writeInbound(unexpectedMessage);
@@ -457,8 +487,8 @@ public class KvStateServerHandlerTest extends TestLogger {
assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
- assertEquals(0, stats.getNumRequests());
- assertEquals(0, stats.getNumFailed());
+ assertEquals(0L, stats.getNumRequests());
+ assertEquals(0L, stats.getNumFailed());
}
/**
@@ -469,30 +499,30 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- 282872,
- new KvStateID(),
- new byte[0]);
+ KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
- assertEquals(1, request.refCnt());
+ assertEquals(1L, serRequest.refCnt());
// Write regular request
- channel.writeInbound(request);
- assertEquals("Buffer not recycled", 0, request.refCnt());
+ channel.writeInbound(serRequest);
+ assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
// Write unexpected msg
ByteBuf unexpected = channel.alloc().buffer(8);
unexpected.writeInt(4);
unexpected.writeInt(4);
- assertEquals(1, unexpected.refCnt());
+ assertEquals(1L, unexpected.refCnt());
channel.writeInbound(unexpected);
- assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+ assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
}
/**
@@ -503,7 +533,10 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
int numKeyGroups = 1;
@@ -550,45 +583,40 @@ public class KvStateServerHandlerTest extends TestLogger {
StringSerializer.INSTANCE);
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- 182828,
- registryListener.kvStateId,
- wrongKeyAndNamespace);
+
+ KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
- assertEquals(182828, response.getRequestId());
+ RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ assertEquals(182828L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));
// Repeat with wrong namespace only
- request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- 182829,
- registryListener.kvStateId,
- wrongNamespace);
+ request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+ serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
buf = (ByteBuf) readInboundBlocking(channel);
buf.skipBytes(4); // skip frame length
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
- response = MessageSerializer.deserializeKvStateRequestFailure(buf);
- assertEquals(182829, response.getRequestId());
+ response = MessageSerializer.deserializeRequestFailure(buf);
+ assertEquals(182829L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));
- assertEquals(2, stats.getNumRequests());
- assertEquals(2, stats.getNumFailed());
+ assertEquals(2L, stats.getNumRequests());
+ assertEquals(2L, stats.getNumFailed());
}
/**
@@ -599,7 +627,10 @@ public class KvStateServerHandlerTest extends TestLogger {
KvStateRegistry registry = new KvStateRegistry();
KvStateRequestStats stats = new AtomicKvStateRequestStats();
- KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
int numKeyGroups = 1;
@@ -650,14 +681,11 @@ public class KvStateServerHandlerTest extends TestLogger {
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- requestId,
- registryListener.kvStateId,
- serializedKeyAndNamespace);
+ KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+ ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
- channel.writeInbound(request);
+ channel.writeInbound(serRequest);
Object msg = readInboundBlocking(channel);
assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
@@ -669,9 +697,9 @@ public class KvStateServerHandlerTest extends TestLogger {
* Queries the embedded channel for data.
*/
private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
- final int sleepMillis = 50;
+ final long sleepMillis = 50L;
- int sleptMillis = 0;
+ long sleptMillis = 0L;
Object msg = null;
while (sleptMillis < READ_TIMEOUT_MILLIS &&
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 9332e68..b7f489a 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -22,14 +22,14 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
@@ -66,7 +66,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
- * Tests for {@link KvStateServer}.
+ * Tests for {@link KvStateServerImpl}.
*/
public class KvStateServerTest {
@@ -87,7 +87,7 @@ public class KvStateServerTest {
*/
@Test
public void testSimpleRequest() throws Exception {
- KvStateServer server = null;
+ KvStateServerImpl server = null;
Bootstrap bootstrap = null;
try {
KvStateRegistry registry = new KvStateRegistry();
@@ -96,7 +96,7 @@ public class KvStateServerTest {
server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
server.start();
- KvStateServerAddress serverAddress = server.getAddress();
+ KvStateServerAddress serverAddress = server.getServerAddress();
int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
@@ -155,25 +155,29 @@ public class KvStateServerTest {
long requestId = Integer.MAX_VALUE + 182828L;
assertTrue(registryListener.registrationName.equals("vanilla"));
- ByteBuf request = MessageSerializer.serializeKvStateRequest(
- channel.alloc(),
- requestId,
+
+ final KvStateInternalRequest request = new KvStateInternalRequest(
registryListener.kvStateId,
serializedKeyAndNamespace);
- channel.writeAndFlush(request);
+ ByteBuf serializeRequest = MessageSerializer.serializeRequest(
+ channel.alloc(),
+ requestId,
+ request);
+
+ channel.writeAndFlush(serializeRequest);
ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
- KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+ assertEquals(requestId, MessageSerializer.getRequestId(buf));
+ KvStateResponse response = server.getSerializer().deserializeResponse(buf);
- assertEquals(requestId, response.getRequestId());
- int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+ int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE);
assertEquals(expectedValue, actualValue);
} finally {
if (server != null) {
- server.shutDown();
+ server.shutdown();
}
if (bootstrap != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
new file mode 100644
index 0000000..32a0c9b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.runtime.query.KvStateID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MessageSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class MessageSerializerTest {
+
+ private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @Parameterized.Parameter
+ public boolean async;
+
+ /**
+ * Tests request serialization.
+ */
+ @Test
+ public void testRequestSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1337L;
+ KvStateID kvStateId = new KvStateID();
+ byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+ final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ assertEquals(requestId, MessageSerializer.getRequestId(buf));
+ KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf);
+
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(kvStateId, requestDeser.getKvStateId());
+ assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests request serialization with zero-length serialized key and namespace.
+ */
+ @Test
+ public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
+
+ long requestId = Integer.MAX_VALUE + 1337L;
+ KvStateID kvStateId = new KvStateID();
+ byte[] serializedKeyAndNamespace = new byte[0];
+
+ final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ assertEquals(requestId, MessageSerializer.getRequestId(buf));
+ KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf);
+
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(kvStateId, requestDeser.getKvStateId());
+ assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> key and namespace.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
+ new KvStateInternalRequest(new KvStateID(), null);
+ }
+
+ /**
+ * Tests response serialization.
+ */
+ @Test
+ public void testResponseSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 72727278L;
+ byte[] serializedResult = randomByteArray(1024);
+
+ final KvStateResponse response = new KvStateResponse(serializedResult);
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, response);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ assertEquals(requestId, MessageSerializer.getRequestId(buf));
+ KvStateResponse responseDeser = serializer.deserializeResponse(buf);
+
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedResult, responseDeser.getContent());
+ }
+
+ /**
+ * Tests response serialization with zero-length serialized result.
+ */
+ @Test
+ public void testResponseSerializationWithZeroLengthSerializedResult() throws Exception {
+ byte[] serializedResult = new byte[0];
+
+ final KvStateResponse response = new KvStateResponse(serializedResult);
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+ ByteBuf buf = MessageSerializer.serializeResponse(alloc, 72727278L, response);
+
+ int frameLength = buf.readInt();
+
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+ KvStateResponse responseDeser = serializer.deserializeResponse(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedResult, responseDeser.getContent());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> results.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+ new KvStateResponse((byte[]) null);
+ }
+
+ /**
+ * Tests request failure serialization.
+ */
+ @Test
+ public void testKvStateRequestFailureSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1111222L;
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, requestId, cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ RequestFailure requestFailure = MessageSerializer.deserializeRequestFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, requestFailure.getRequestId());
+ assertEquals(cause.getClass(), requestFailure.getCause().getClass());
+ assertEquals(cause.getMessage(), requestFailure.getCause().getMessage());
+ }
+
+ /**
+ * Tests server failure serialization.
+ */
+ @Test
+ public void testServerFailureSerialization() throws Exception {
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+ Throwable request = MessageSerializer.deserializeServerFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(cause.getClass(), request.getClass());
+ assertEquals(cause.getMessage(), request.getMessage());
+ }
+
+ private byte[] randomByteArray(int capacity) {
+ byte[] bytes = new byte[capacity];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ return bytes;
+ }
+}