You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:05 UTC
[05/14] flink git commit: [FLINK-7769][QS] Move queryable state
outside the runtime.
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
new file mode 100644
index 0000000..c37c822
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link KvStateServerHandler}.
+ */
+public class KvStateServerHandlerTest extends TestLogger {
+
+ /** Shared Thread pool for query execution. */
+ private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
+
+ private static final int READ_TIMEOUT_MILLIS = 10000;
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (TEST_THREAD_POOL != null) {
+ TEST_THREAD_POOL.shutdown();
+ }
+ }
+
+ /**
+ * Tests a simple successful query via an EmbeddedChannel.
+ */
+ @Test
+ public void testSimpleQuery() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ // Register state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+ final TestRegistryListener registryListener = new TestRegistryListener();
+ registry.registerListener(registryListener);
+
+ // Update the KvState and request it
+ int expectedValue = 712828289;
+
+ int key = 99812822;
+ backend.setCurrentKey(key);
+ ValueState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ state.update(expectedValue);
+
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ key,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ long requestId = Integer.MAX_VALUE + 182828L;
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ registryListener.kvStateId,
+ serializedKeyAndNamespace);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+
+ assertEquals(requestId, response.getRequestId());
+
+ int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+ assertEquals(expectedValue, actualValue);
+
+ assertEquals(stats.toString(), 1, stats.getNumRequests());
+
+ // Wait for async successful request report
+ long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
+ Thread.sleep(10);
+ }
+
+ assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+ }
+
+ /**
+ * Tests the failure response with {@link UnknownKvStateID} as cause on
+ * queries for unregistered KvStateIDs.
+ */
+ @Test
+ public void testQueryUnknownKvStateID() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ long requestId = Integer.MAX_VALUE + 182828L;
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ new KvStateID(),
+ new byte[0]);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+ assertEquals(requestId, response.getRequestId());
+
+ assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID);
+
+ assertEquals(1, stats.getNumRequests());
+ assertEquals(1, stats.getNumFailed());
+ }
+
+ /**
+ * Tests the failure response with {@link UnknownKeyOrNamespace} as cause
+ * on queries for non-existing keys.
+ */
+ @Test
+ public void testQueryUnknownKey() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+ final TestRegistryListener registryListener = new TestRegistryListener();
+ registry.registerListener(registryListener);
+
+ // Register state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ 1238283,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ long requestId = Integer.MAX_VALUE + 22982L;
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ registryListener.kvStateId,
+ serializedKeyAndNamespace);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+ assertEquals(requestId, response.getRequestId());
+
+ assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace);
+
+ assertEquals(1, stats.getNumRequests());
+ assertEquals(1, stats.getNumFailed());
+ }
+
+ /**
+ * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
+ * call.
+ */
+ @Test
+ public void testFailureOnGetSerializedValue() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ // Failing KvState
+ InternalKvState<?> kvState = mock(InternalKvState.class);
+ when(kvState.getSerializedValue(any(byte[].class)))
+ .thenThrow(new RuntimeException("Expected test Exception"));
+
+ KvStateID kvStateId = registry.registerKvState(
+ new JobID(),
+ new JobVertexID(),
+ new KeyGroupRange(0, 0),
+ "vanilla",
+ kvState);
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ 282872,
+ kvStateId,
+ new byte[0]);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+ assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
+
+ assertEquals(1, stats.getNumRequests());
+ assertEquals(1, stats.getNumFailed());
+ }
+
+ /**
+ * Tests that the channel is closed if an Exception reaches the channel
+ * handler.
+ */
+ @Test
+ public void testCloseChannelOnExceptionCaught() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+ channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+ Throwable response = MessageSerializer.deserializeServerFailure(buf);
+
+ assertTrue(response.getMessage().contains("Expected test Exception"));
+
+ channel.closeFuture().await(READ_TIMEOUT_MILLIS);
+ assertFalse(channel.isActive());
+ }
+
+ /**
+ * Tests the failure response on a rejected execution, because the query
+ * executor has been closed.
+ */
+ @Test
+ public void testQueryExecutorShutDown() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ ExecutorService closedExecutor = Executors.newSingleThreadExecutor();
+ closedExecutor.shutdown();
+ assertTrue(closedExecutor.isShutdown());
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+ final TestRegistryListener registryListener = new TestRegistryListener();
+ registry.registerListener(registryListener);
+
+ // Register state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ 282872,
+ registryListener.kvStateId,
+ new byte[0]);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+ assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
+
+ assertEquals(1, stats.getNumRequests());
+ assertEquals(1, stats.getNumFailed());
+ }
+
+ /**
+ * Tests response on unexpected messages.
+ */
+ @Test
+ public void testUnexpectedMessage() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ // Write the request and wait for the response
+ ByteBuf unexpectedMessage = Unpooled.buffer(8);
+ unexpectedMessage.writeInt(4);
+ unexpectedMessage.writeInt(123238213);
+
+ channel.writeInbound(unexpectedMessage);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+ Throwable response = MessageSerializer.deserializeServerFailure(buf);
+
+ assertEquals(0, stats.getNumRequests());
+ assertEquals(0, stats.getNumFailed());
+
+ unexpectedMessage = MessageSerializer.serializeKvStateRequestResult(
+ channel.alloc(),
+ 192,
+ new byte[0]);
+
+ channel.writeInbound(unexpectedMessage);
+
+ buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+ response = MessageSerializer.deserializeServerFailure(buf);
+
+ assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
+
+ assertEquals(0, stats.getNumRequests());
+ assertEquals(0, stats.getNumFailed());
+ }
+
+ /**
+ * Tests that incoming buffer instances are recycled.
+ */
+ @Test
+ public void testIncomingBufferIsRecycled() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ 282872,
+ new KvStateID(),
+ new byte[0]);
+
+ assertEquals(1, request.refCnt());
+
+ // Write regular request
+ channel.writeInbound(request);
+ assertEquals("Buffer not recycled", 0, request.refCnt());
+
+ // Write unexpected msg
+ ByteBuf unexpected = channel.alloc().buffer(8);
+ unexpected.writeInt(4);
+ unexpected.writeInt(4);
+
+ assertEquals(1, unexpected.refCnt());
+
+ channel.writeInbound(unexpected);
+ assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+ }
+
+ /**
+ * Tests the failure response if the serializers don't match.
+ */
+ @Test
+ public void testSerializerMismatch() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+ final TestRegistryListener registryListener = new TestRegistryListener();
+ registry.registerListener(registryListener);
+
+ // Register state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ ValueState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ int key = 99812822;
+
+ // Update the KvState
+ backend.setCurrentKey(key);
+ state.update(712828289);
+
+ byte[] wrongKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ "wrong-key-type",
+ StringSerializer.INSTANCE,
+ "wrong-namespace-type",
+ StringSerializer.INSTANCE);
+
+ byte[] wrongNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ key,
+ IntSerializer.INSTANCE,
+ "wrong-namespace-type",
+ StringSerializer.INSTANCE);
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ 182828,
+ registryListener.kvStateId,
+ wrongKeyAndNamespace);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ assertEquals(182828, response.getRequestId());
+ assertTrue(response.getCause().getMessage().contains("IOException"));
+
+ // Repeat with wrong namespace only
+ request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ 182829,
+ registryListener.kvStateId,
+ wrongNamespace);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ assertEquals(182829, response.getRequestId());
+ assertTrue(response.getCause().getMessage().contains("IOException"));
+
+ assertEquals(2, stats.getNumRequests());
+ assertEquals(2, stats.getNumFailed());
+ }
+
+ /**
+ * Tests that large responses are chunked.
+ */
+ @Test
+ public void testChunkedResponse() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ KvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+ EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+ final TestRegistryListener registryListener = new TestRegistryListener();
+ registry.registerListener(registryListener);
+
+ // Register state
+ ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ ValueState<byte[]> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ // Update KvState
+ byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()];
+
+ byte current = 0;
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = current++;
+ }
+
+ int key = 99812822;
+ backend.setCurrentKey(key);
+ state.update(bytes);
+
+ // Request
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ key,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ long requestId = Integer.MAX_VALUE + 182828L;
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ registryListener.kvStateId,
+ serializedKeyAndNamespace);
+
+ // Write the request and wait for the response
+ channel.writeInbound(request);
+
+ Object msg = readInboundBlocking(channel);
+ assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Queries the embedded channel for data.
+ */
+ private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
+ final int sleepMillis = 50;
+
+ int sleptMillis = 0;
+
+ Object msg = null;
+ while (sleptMillis < READ_TIMEOUT_MILLIS &&
+ (msg = channel.readOutbound()) == null) {
+
+ Thread.sleep(sleepMillis);
+ sleptMillis += sleepMillis;
+ }
+
+ if (msg == null) {
+ throw new TimeoutException();
+ } else {
+ return msg;
+ }
+ }
+
+ /**
+ * Frame length decoder (expected by the serialized messages).
+ */
+ private ChannelHandler getFrameDecoder() {
+ return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
+ }
+
+ /**
+ * A listener that keeps the last updated KvState information so that a test
+ * can retrieve it.
+ */
+ static class TestRegistryListener implements KvStateRegistryListener {
+ volatile JobVertexID jobVertexID;
+ volatile KeyGroupRange keyGroupIndex;
+ volatile String registrationName;
+ volatile KvStateID kvStateId;
+
+ @Override
+ public void notifyKvStateRegistered(JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName,
+ KvStateID kvStateId) {
+ this.jobVertexID = jobVertexId;
+ this.keyGroupIndex = keyGroupRange;
+ this.registrationName = registrationName;
+ this.kvStateId = kvStateId;
+ }
+
+ @Override
+ public void notifyKvStateUnregistered(JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
new file mode 100644
index 0000000..9332e68
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KvStateServer}.
+ */
+public class KvStateServerTest {
+
+ // Thread pool for client bootstrap (shared between tests)
+ private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+ private static final int TIMEOUT_MILLIS = 10000;
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (NIO_GROUP != null) {
+ NIO_GROUP.shutdownGracefully();
+ }
+ }
+
+ /**
+ * Tests a simple successful query via a SocketChannel.
+ */
+ @Test
+ public void testSimpleRequest() throws Exception {
+ KvStateServer server = null;
+ Bootstrap bootstrap = null;
+ try {
+ KvStateRegistry registry = new KvStateRegistry();
+ KvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
+ server.start();
+
+ KvStateServerAddress serverAddress = server.getAddress();
+ int numKeyGroups = 1;
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(registry);
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ registry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+ final KvStateServerHandlerTest.TestRegistryListener registryListener =
+ new KvStateServerHandlerTest.TestRegistryListener();
+
+ registry.registerListener(registryListener);
+
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("vanilla");
+
+ ValueState<Integer> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ // Update KvState
+ int expectedValue = 712828289;
+
+ int key = 99812822;
+ backend.setCurrentKey(key);
+ state.update(expectedValue);
+
+ // Request
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ key,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ // Connect to the server
+ final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>();
+ bootstrap = createBootstrap(
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ responses.add((ByteBuf) msg);
+ }
+ });
+
+ Channel channel = bootstrap
+ .connect(serverAddress.getHost(), serverAddress.getPort())
+ .sync().channel();
+
+ long requestId = Integer.MAX_VALUE + 182828L;
+
+ assertTrue(registryListener.registrationName.equals("vanilla"));
+ ByteBuf request = MessageSerializer.serializeKvStateRequest(
+ channel.alloc(),
+ requestId,
+ registryListener.kvStateId,
+ serializedKeyAndNamespace);
+
+ channel.writeAndFlush(request);
+
+ ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+
+ assertEquals(requestId, response.getRequestId());
+ int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+ assertEquals(expectedValue, actualValue);
+ } finally {
+ if (server != null) {
+ server.shutDown();
+ }
+
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null) {
+ group.shutdownGracefully();
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a client bootstrap.
+ */
+ private Bootstrap createBootstrap(final ChannelHandler... handlers) {
+ return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(handlers);
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
new file mode 100644
index 0000000..5d4a861
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapValueState;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.MathUtils;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link QueryableStateClient}.
+ */
+public class QueryableStateClientTest {
+
+ private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testActorSystem != null) {
+ testActorSystem.shutdown();
+ }
+ }
+
+ /**
+ * All failures should lead to a retry with a forced location lookup.
+ *
+ * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation,
+ * ConnectException are checked explicitly as these indicate out-of-sync
+ * KvStateLocation.
+ */
+ @Test
+ public void testForceLookupOnOutdatedLocation() throws Exception {
+ KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+ KvStateClient networkClient = mock(KvStateClient.class);
+
+ QueryableStateClient client = new QueryableStateClient(
+ lookupService,
+ networkClient,
+ testActorSystem.dispatcher());
+
+ try {
+ JobID jobId = new JobID();
+ int numKeyGroups = 4;
+
+ //
+ // UnknownKvStateLocation
+ //
+ String query1 = "lucky";
+
+ Future<KvStateLocation> unknownKvStateLocation = Futures.failed(
+ new UnknownKvStateLocation(query1));
+
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1)))
+ .thenReturn(unknownKvStateLocation);
+
+ Future<Integer> result = client.getKvState(
+ jobId,
+ query1,
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ try {
+ Await.result(result, timeout);
+ fail("Did not throw expected UnknownKvStateLocation exception");
+ } catch (UnknownKvStateLocation ignored) {
+ // Expected
+ }
+
+ verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
+
+ //
+ // UnknownKvStateKeyGroupLocation
+ //
+ String query2 = "unlucky";
+
+ Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful(
+ new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2));
+
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2)))
+ .thenReturn(unknownKeyGroupLocation);
+
+ result = client.getKvState(
+ jobId,
+ query2,
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ try {
+ Await.result(result, timeout);
+ fail("Did not throw expected UnknownKvStateKeyGroupLocation exception");
+ } catch (UnknownKvStateKeyGroupLocation ignored) {
+ // Expected
+ }
+
+ verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
+
+ //
+ // UnknownKvStateID
+ //
+ String query3 = "water";
+ KvStateID kvStateId = new KvStateID();
+ Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId));
+
+ KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
+ KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
+ for (int i = 0; i < numKeyGroups; i++) {
+ location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
+ }
+
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
+ .thenReturn(Futures.successful(location));
+
+ when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+ .thenReturn(unknownKvStateId);
+
+ result = client.getKvState(
+ jobId,
+ query3,
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ try {
+ Await.result(result, timeout);
+ fail("Did not throw expected UnknownKvStateID exception");
+ } catch (UnknownKvStateID ignored) {
+ // Expected
+ }
+
+ verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
+
+ //
+ // ConnectException
+ //
+ String query4 = "space";
+ Future<byte[]> connectException = Futures.failed(new ConnectException());
+ kvStateId = new KvStateID();
+
+ serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
+ location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
+ for (int i = 0; i < numKeyGroups; i++) {
+ location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
+ }
+
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
+ .thenReturn(Futures.successful(location));
+
+ when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+ .thenReturn(connectException);
+
+ result = client.getKvState(
+ jobId,
+ query4,
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ try {
+ Await.result(result, timeout);
+ fail("Did not throw expected ConnectException exception");
+ } catch (ConnectException ignored) {
+ // Expected
+ }
+
+ verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
+
+ //
+ // Other Exceptions don't lead to a retry no retry
+ //
+ String query5 = "universe";
+ Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception"));
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5)))
+ .thenReturn(exception);
+
+ client.getKvState(
+ jobId,
+ query5,
+ 0,
+ BasicTypeInfo.INT_TYPE_INFO,
+ new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
+
+ verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
+ } finally {
+ client.shutDown();
+ }
+ }
+
+ /**
+ * Tests queries against multiple servers.
+ *
+ * <p>The servers are populated with different keys and the client queries
+ * all available keys from all servers.
+ */
+ @Test
+ public void testIntegrationWithKvStateServer() throws Exception {
+ // Config
+ int numServers = 2;
+ int numKeys = 1024;
+ int numKeyGroups = 1;
+
+ JobID jobId = new JobID();
+ JobVertexID jobVertexId = new JobVertexID();
+
+ KvStateServer[] servers = new KvStateServer[numServers];
+ AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+
+ QueryableStateClient client = null;
+ KvStateClient networkClient = null;
+ AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats();
+
+ MemoryStateBackend backend = new MemoryStateBackend();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+
+ AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+
+ try {
+ KvStateRegistry[] registries = new KvStateRegistry[numServers];
+ KvStateID[] kvStateIds = new KvStateID[numServers];
+ List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>();
+
+ // Start the servers
+ for (int i = 0; i < numServers; i++) {
+ registries[i] = new KvStateRegistry();
+ serverStats[i] = new AtomicKvStateRequestStats();
+ servers[i] = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]);
+ servers[i].start();
+ ValueStateDescriptor<Integer> descriptor =
+ new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+
+ RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ VoidNamespaceSerializer.INSTANCE,
+ IntSerializer.INSTANCE);
+
+ // Register state
+ HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
+ descriptor,
+ new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
+ IntSerializer.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ kvStates.add(kvState);
+
+ kvStateIds[i] = registries[i].registerKvState(
+ jobId,
+ new JobVertexID(),
+ new KeyGroupRange(i, i),
+ "choco",
+ kvState);
+ }
+
+ int[] expectedRequests = new int[numServers];
+
+ for (int key = 0; key < numKeys; key++) {
+ int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers;
+ expectedRequests[targetKeyGroupIndex]++;
+
+ HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex);
+
+ keyedStateBackend.setCurrentKey(key);
+ kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ kvState.update(1337 + key);
+ }
+
+ // Location lookup service
+ KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
+ for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
+ location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+ }
+
+ KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+ when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco")))
+ .thenReturn(Futures.successful(location));
+
+ // The client
+ networkClient = new KvStateClient(1, networkClientStats);
+
+ client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher());
+
+ // Send all queries
+ List<Future<Integer>> futures = new ArrayList<>(numKeys);
+ for (int key = 0; key < numKeys; key++) {
+ ValueStateDescriptor<Integer> descriptor =
+ new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ futures.add(client.getKvState(
+ jobId,
+ "choco",
+ key,
+ BasicTypeInfo.INT_TYPE_INFO,
+ descriptor));
+ }
+
+ // Verify results
+ Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher());
+ Iterable<Integer> results = Await.result(future, timeout);
+
+ int index = 0;
+ for (int buffer : results) {
+ assertEquals(1337 + index, buffer);
+ index++;
+ }
+
+ // Verify requests
+ for (int i = 0; i < numServers; i++) {
+ int numRetries = 10;
+ for (int retry = 0; retry < numRetries; retry++) {
+ try {
+ assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests());
+ assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
+ assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed());
+ break;
+ } catch (Throwable t) {
+ // Retry
+ if (retry == numRetries - 1) {
+ throw t;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ if (networkClient != null) {
+ networkClient.shutDown();
+ }
+
+ for (KvStateServer server : servers) {
+ if (server != null) {
+ server.shutDown();
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests that the QueryableState client correctly caches location lookups
+ * keyed by both job and name. This test is mainly due to a previous bug due
+ * to which cache entries were by name only. This is a problem, because the
+ * same client can be used to query multiple jobs.
+ */
+ @Test
+ public void testLookupMultipleJobIds() throws Exception {
+ String name = "unique-per-job";
+
+ // Exact contents don't matter here
+ KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
+ location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+
+ JobID jobId1 = new JobID();
+ JobID jobId2 = new JobID();
+
+ KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+
+ when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString()))
+ .thenReturn(Futures.successful(location));
+
+ KvStateClient networkClient = mock(KvStateClient.class);
+ when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class)))
+ .thenReturn(Futures.successful(new byte[0]));
+
+ QueryableStateClient client = new QueryableStateClient(
+ lookupService,
+ networkClient,
+ testActorSystem.dispatcher());
+
+ ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
+
+ // Query ies with same name, but different job IDs should lead to a
+ // single lookup per query and job ID.
+ client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
+ client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
+
+ verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
+ verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml
new file mode 100644
index 0000000..e9e7496
--- /dev/null
+++ b/flink-queryable-state/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-queryable-state</artifactId>
+ <name>flink-queryable-state</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-queryable-state-java</module>
+ <!-- <module>flink-state-client-scala</module>-->
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 9193859..53503ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
deleted file mode 100644
index a37a3ac..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Akka-based {@link KvStateLocationLookupService} that retrieves the current
- * JobManager address and uses it for lookups.
- */
-class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
-
- /** Future returned when no JobManager is available. */
- private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
-
- /** Leader retrieval service to retrieve the current job manager. */
- private final LeaderRetrievalService leaderRetrievalService;
-
- /** The actor system used to resolve the JobManager address. */
- private final ActorSystem actorSystem;
-
- /** Timeout for JobManager ask-requests. */
- private final FiniteDuration askTimeout;
-
- /** Retry strategy factory on future failures. */
- private final LookupRetryStrategyFactory retryStrategyFactory;
-
- /** Current job manager future. */
- private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
-
- /**
- * Creates the Akka-based {@link KvStateLocationLookupService}.
- *
- * @param leaderRetrievalService Leader retrieval service to use.
- * @param actorSystem Actor system to use.
- * @param askTimeout Timeout for JobManager ask-requests.
- * @param retryStrategyFactory Retry strategy if no JobManager available.
- */
- AkkaKvStateLocationLookupService(
- LeaderRetrievalService leaderRetrievalService,
- ActorSystem actorSystem,
- FiniteDuration askTimeout,
- LookupRetryStrategyFactory retryStrategyFactory) {
-
- this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
- this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
- this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
- this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
- }
-
- public void start() {
- try {
- leaderRetrievalService.start(this);
- } catch (Exception e) {
- LOG.error("Failed to start leader retrieval service", e);
- throw new RuntimeException(e);
- }
- }
-
- public void shutDown() {
- try {
- leaderRetrievalService.stop();
- } catch (Exception e) {
- LOG.error("Failed to stop leader retrieval service", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
- return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
- }
-
- /**
- * Returns a future holding the {@link KvStateLocation} for the given job
- * and KvState registration name.
- *
- * <p>If there is currently no JobManager registered with the service, the
- * request is retried. The retry behaviour is specified by the
- * {@link LookupRetryStrategy} of the lookup service.
- *
- * @param jobId JobID the KvState instance belongs to
- * @param registrationName Name under which the KvState has been registered
- * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
- * @return Future holding the {@link KvStateLocation}
- */
- @SuppressWarnings("unchecked")
- private Future<KvStateLocation> getKvStateLookupInfo(
- final JobID jobId,
- final String registrationName,
- final LookupRetryStrategy lookupRetryStrategy) {
-
- return jobManagerFuture
- .flatMap(new Mapper<ActorGateway, Future<Object>>() {
- @Override
- public Future<Object> apply(ActorGateway jobManager) {
- // Lookup the KvStateLocation
- Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
- return jobManager.ask(msg, askTimeout);
- }
- }, actorSystem.dispatcher())
- .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
- .recoverWith(new Recover<Future<KvStateLocation>>() {
- @Override
- public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
- // If the Future fails with UnknownJobManager, retry
- // the request. Otherwise all Futures will be failed
- // during the start up phase, when the JobManager did
- // not notify this service yet or leadership is lost
- // intermittently.
- if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
- return Patterns.after(
- lookupRetryStrategy.getRetryDelay(),
- actorSystem.scheduler(),
- actorSystem.dispatcher(),
- new Callable<Future<KvStateLocation>>() {
- @Override
- public Future<KvStateLocation> call() throws Exception {
- return getKvStateLookupInfo(
- jobId,
- registrationName,
- lookupRetryStrategy);
- }
- });
- } else {
- return Futures.failed(failure);
- }
- }
- }, actorSystem.dispatcher());
- }
-
- @Override
- public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
- }
-
- if (leaderAddress == null) {
- jobManagerFuture = UNKNOWN_JOB_MANAGER;
- } else {
- jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
- .map(new Mapper<ActorRef, ActorGateway>() {
- @Override
- public ActorGateway apply(ActorRef actorRef) {
- return new AkkaActorGateway(actorRef, leaderSessionID);
- }
- }, actorSystem.dispatcher());
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- jobManagerFuture = Futures.failed(exception);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Retry strategy for failed lookups.
- *
- * <p>Usage:
- * <pre>
- * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
- *
- * if (retryStrategy.tryRetry()) {
- * // OK to retry
- * FiniteDuration retryDelay = retryStrategy.getRetryDelay();
- * }
- * </pre>
- */
- interface LookupRetryStrategy {
-
- /**
- * Returns the current retry.
- *
- * @return Current retry delay.
- */
- FiniteDuration getRetryDelay();
-
- /**
- * Tries another retry and returns whether it is allowed or not.
- *
- * @return Whether it is allowed to do another restart or not.
- */
- boolean tryRetry();
-
- }
-
- /**
- * Factory for retry strategies.
- */
- interface LookupRetryStrategyFactory {
-
- /**
- * Creates a new retry strategy.
- *
- * @return The retry strategy.
- */
- LookupRetryStrategy createRetryStrategy();
-
- }
-
- /**
- * Factory for disabled retries.
- */
- static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
- private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
-
- @Override
- public LookupRetryStrategy createRetryStrategy() {
- return RETRY_STRATEGY;
- }
-
- private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
-
- @Override
- public FiniteDuration getRetryDelay() {
- return FiniteDuration.Zero();
- }
-
- @Override
- public boolean tryRetry() {
- return false;
- }
- }
-
- }
-
- /**
- * Factory for fixed delay retries.
- */
- static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
- private final int maxRetries;
- private final FiniteDuration retryDelay;
-
- FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
- this.maxRetries = maxRetries;
- this.retryDelay = retryDelay;
- }
-
- @Override
- public LookupRetryStrategy createRetryStrategy() {
- return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
- }
-
- private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
-
- private final Object retryLock = new Object();
- private final int maxRetries;
- private final FiniteDuration retryDelay;
- private int numRetries;
-
- public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
- Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
- this.maxRetries = maxRetries;
- this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
- }
-
- @Override
- public FiniteDuration getRetryDelay() {
- synchronized (retryLock) {
- return retryDelay;
- }
- }
-
- @Override
- public boolean tryRetry() {
- synchronized (retryLock) {
- if (numRetries < maxRetries) {
- numRetries++;
- return true;
- } else {
- return false;
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 86d1838..8a213bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
* Location information for all key groups of a {@link InternalKvState} instance.
*
* <p>This is populated by the {@link KvStateLocationRegistry} and used by the
- * {@link QueryableStateClient} to target queries.
+ * Queryable State Client to target queries.
*/
public class KvStateLocation implements Serializable {
@@ -166,7 +166,7 @@ public class KvStateLocation implements Serializable {
* @param kvStateAddress Server address of the KvState instance at the key group index.
* @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
*/
- void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+ public void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
throw new IndexOutOfBoundsException("Key group index");
@@ -183,6 +183,10 @@ public class KvStateLocation implements Serializable {
}
}
+ public static long getSerialVersionUID() {
+ return serialVersionUID;
+ }
+
/**
* Registers a KvState instance for the given key group index.
*
@@ -190,7 +194,7 @@ public class KvStateLocation implements Serializable {
* @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
* @throws IllegalArgumentException If no location information registered for a key group index in the range.
*/
- void unregisterKvState(KeyGroupRange keyGroupRange) {
+ public void unregisterKvState(KeyGroupRange keyGroupRange) {
if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
throw new IndexOutOfBoundsException("Key group index");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
deleted file mode 100644
index dfd9c14..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-
-import scala.concurrent.Future;
-
-/**
- * {@link KvStateLocation} lookup service.
- */
-public interface KvStateLocationLookupService {
-
- /**
- * Starts the lookup service.
- */
- void start();
-
- /**
- * Shuts down the lookup service.
- */
- void shutDown();
-
- /**
- * Returns a future holding the {@link KvStateLocation} for the given job
- * and KvState registration name.
- *
- * @param jobId JobID the KvState instance belongs to
- * @param registrationName Name under which the KvState has been registered
- * @return Future holding the {@link KvStateLocation}
- */
- Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 26b700c..90fa5cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskmanager.Task;
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
new file mode 100644
index 0000000..9b14c49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * An interface for the Queryable State Server running on each Task Manager in the cluster.
+ * This server is responsible for serving requests coming from the Queryable State Client and
+ * requesting <b>locally</b> stored state.
+ */
+public interface KvStateServer {
+
+ /**
+ * Returns the address of this server.
+ *
+ * @return Server address
+ */
+ KvStateServerAddress getAddress();
+
+
+ /** Starts the proxy. */
+ void start() throws InterruptedException;
+
+ /**
+ * Shuts down the server and all related thread pools.
+ */
+ void shutDown();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
index 9ec25bc..2599855 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.query;
-import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -88,4 +87,9 @@ public class KvStateServerAddress implements Serializable {
result = 31 * result + port;
return result;
}
+
+ @Override
+ public String toString() {
+ return hostAddress.getHostName() + ':' + port;
+ }
}