You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:11 UTC

[11/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index a7f65f3..7ff4ec6 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -34,8 +35,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -53,25 +58,27 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.OnSuccess;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
 
 import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
@@ -84,10 +91,12 @@ import static org.junit.Assert.assertTrue;
  */
 public abstract class AbstractQueryableStateITCase extends TestLogger {
 
-	protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS);
-	private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+	private static final int NO_OF_RETRIES = 100;
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
+	private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
 
-	protected static ActorSystem testActorSystem;
+	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+	private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
 
 	/**
 	 * State backend to use.
@@ -136,7 +145,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 		final int numKeys = 256;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 
@@ -150,7 +161,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestKeyRangeSource(numKeys));
@@ -163,15 +174,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			final String queryName = "hakuna-matata";
 
-			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 7143749578983540352L;
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7143749578983540352L;
 
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState(queryName, reducingState);
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState(queryName, reducingState);
 
 			// Submit the job graph
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -188,19 +198,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			while (!allNonZero && deadline.hasTimeLeft()) {
 				allNonZero = true;
 
-				final List<Future<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
+				final List<CompletableFuture<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
 
 				for (int i = 0; i < numKeys; i++) {
 					final int key = i;
 
-					if (counts.get(key) > 0) {
+					if (counts.get(key) > 0L) {
 						// Skip this one
 						continue;
 					} else {
 						allNonZero = false;
 					}
 
-					Future<Tuple2<Integer, Long>> result = getKvStateWithRetries(
+					CompletableFuture<Tuple2<Integer, Long>> result = getKvStateWithRetries(
 							client,
 							jobId,
 							queryName,
@@ -208,24 +218,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 							BasicTypeInfo.INT_TYPE_INFO,
 							reducingState,
 							QUERY_RETRY_DELAY,
-							false);
+							false,
+							executor);
 
-					result.onSuccess(new OnSuccess<Tuple2<Integer, Long>>() {
-						@Override
-						public void onSuccess(Tuple2<Integer, Long> result) throws Throwable {
-							counts.set(key, result.f1);
-							assertEquals("Key mismatch", key, result.f0.intValue());
-						}
-					}, testActorSystem.dispatcher());
+					result.thenAccept(res -> {
+						counts.set(key, res.f1);
+						assertEquals("Key mismatch", key, res.f0.intValue());
+					});
 
 					futures.add(result);
 				}
 
-				Future<Iterable<Tuple2<Integer, Long>>> futureSequence = Futures.sequence(
-						futures,
-						testActorSystem.dispatcher());
-
-				Await.ready(futureSequence, deadline.timeLeft());
+				// wait for all the futures to complete
+				CompletableFuture
+						.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+						.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
 			assertTrue("Not all keys are non-zero", allNonZero);
@@ -238,15 +245,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -274,7 +281,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestKeyRangeSource(numKeys));
@@ -311,22 +318,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 			jobId = jobGraph.getJobID();
 
-			Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster
-					.getLeaderGateway(deadline.timeLeft())
-					.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava(
+					cluster.getLeaderGateway(deadline.timeLeft())
+							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft())
+							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
 
 			cluster.submitJobDetached(jobGraph);
 
-			TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
+			TestingJobManagerMessages.JobStatusIs jobStatus =
+					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			assertEquals(JobStatus.FAILED, jobStatus.state());
 
 			// Get the job and check the cause
-			JobManagerMessages.JobFound jobFound = Await.result(
+			JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
 					cluster.getLeaderGateway(deadline.timeLeft())
 							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
-					deadline.timeLeft());
+							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
 
@@ -338,10 +346,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				scala.concurrent.Future<CancellationSuccess> cancellation = cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
 
 				Await.ready(cancellation, deadline.timeLeft());
 			}
@@ -359,9 +367,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-		final int numElements = 1024;
+		final long numElements = 1024L;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -371,7 +381,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestAscendingValueSource(numElements));
@@ -381,15 +391,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 					"any",
 					source.getType());
 
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 7662520075515707428L;
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7662520075515707428L;
 
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("hakuna", valueState);
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("hakuna", valueState);
 
 			// Submit the job graph
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -397,22 +406,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements;
-
-			executeQuery(deadline, client, jobId, "hakuna", valueState, expected);
+			executeQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -425,9 +431,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-		final int numElements = 1024;
+		final long numElements = 1024L;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -437,7 +445,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 				.addSource(new TestAscendingValueSource(numElements));
@@ -481,15 +489,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
-					.getLeaderGateway(deadline.timeLeft())
-					.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -508,23 +516,25 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		for (int key = 0; key < maxParallelism; key++) {
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
-				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
-					jobId,
-					queryableStateName,
-					key,
-					BasicTypeInfo.INT_TYPE_INFO,
-					stateDescriptor,
-					QUERY_RETRY_DELAY,
-					false);
+				CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
+						client,
+						jobId,
+						queryableStateName,
+						key,
+						BasicTypeInfo.INT_TYPE_INFO,
+						stateDescriptor,
+						QUERY_RETRY_DELAY,
+						false,
+						executor);
 
-				Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
+				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 				assertEquals("Key mismatch", key, value.f0.intValue());
 				if (expected == value.f1) {
 					success = true;
 				} else {
 					// Retry
-					Thread.sleep(50);
+					Thread.sleep(50L);
 				}
 			}
 
@@ -554,16 +564,17 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 						BasicTypeInfo.INT_TYPE_INFO,
 						valueSerializer,
 						QUERY_RETRY_DELAY,
-						false);
+						false,
+						executor);
 
-				Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
+				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 				assertEquals("Key mismatch", key, value.f0.intValue());
 				if (expected == value.f1) {
 					success = true;
 				} else {
 					// Retry
-					Thread.sleep(50);
+					Thread.sleep(50L);
 				}
 			}
 
@@ -575,20 +586,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	 * Tests simple value state queryable state instance with a default value
 	 * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
 	 * tuples, the key is mapped to 1 but key 0 is queried which should throw
-	 * a {@link UnknownKeyOrNamespace} exception.
+	 * a {@link UnknownKeyOrNamespaceException} exception.
 	 *
-	 * @throws UnknownKeyOrNamespace thrown due querying a non-existent key
+	 * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key
 	 */
-	@Test(expected = UnknownKeyOrNamespace.class)
-	public void testValueStateDefault() throws
-		Exception, UnknownKeyOrNamespace {
+	@Test(expected = UnknownKeyOrNamespaceException.class)
+	public void testValueStateDefault() throws Throwable {
 
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-		final int numElements = 1024;
+		final long numElements = 1024L;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -600,7 +612,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// don't explicitly check that all slots are available before
 			// submitting.
 			env.setRestartStrategy(RestartStrategies
-				.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+				.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 				.addSource(new TestAscendingValueSource(numElements));
@@ -635,30 +647,37 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			// Now query
 			int key = 0;
-			Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
-				jobId,
-				queryableState.getQueryableStateName(),
-				key,
-				BasicTypeInfo.INT_TYPE_INFO,
-				valueState,
-				QUERY_RETRY_DELAY,
-				true);
-
-			Await.result(future, deadline.timeLeft());
+			CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
+					client,
+					jobId,
+					queryableState.getQueryableStateName(),
+					key,
+					BasicTypeInfo.INT_TYPE_INFO,
+					valueState,
+					QUERY_RETRY_DELAY,
+					true,
+					executor);
+
+			try {
+				future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (ExecutionException | CompletionException e) {
+				// get() on a completedExceptionally future wraps the
+				// exception in an ExecutionException.
+				throw e.getCause();
+			}
 		} finally {
+
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
-					.getLeaderGateway(deadline.timeLeft())
-					.ask(new JobManagerMessages.CancelJob(jobId),
-						deadline.timeLeft())
-					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
-						CancellationSuccess.class));
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -675,9 +694,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-		final int numElements = 1024;
+		final long numElements = 1024L;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -687,7 +708,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Very important, because cluster is shared between tests and we
 			// don't explicitly check that all slots are available before
 			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
 			DataStream<Tuple2<Integer, Long>> source = env
 					.addSource(new TestAscendingValueSource(numElements));
@@ -709,23 +730,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Now query
-			long expected = numElements;
-
 			executeQuery(deadline, client, jobId, "matata",
-					queryableState.getValueSerializer(), expected);
+					queryableState.getValueSerializer(), numElements);
 		} finally {
+
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(
+						cluster.getLeaderGateway(deadline.timeLeft())
+								.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+								.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -743,7 +762,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 		final int numElements = 1024;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -788,21 +809,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					Future<String> future = getKvStateWithRetries(client,
+					CompletableFuture<String> future = getKvStateWithRetries(
+							client,
 							jobId,
 							queryableState.getQueryableStateName(),
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							foldingState,
 							QUERY_RETRY_DELAY,
-							false);
+							false,
+							executor);
 
-					String value = Await.result(future, deadline.timeLeft());
+					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 					if (expected.equals(value)) {
 						success = true;
 					} else {
 						// Retry
-						Thread.sleep(50);
+						Thread.sleep(50L);
 					}
 				}
 
@@ -811,15 +834,15 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
@@ -834,9 +857,11 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-		final int numElements = 1024;
+		final long numElements = 1024L;
 
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
 		JobID jobId = null;
 		try {
@@ -858,15 +883,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 							new SumReduce(),
 							source.getType());
 
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						private static final long serialVersionUID = 8470749712274833552L;
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
 
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("jungle", reducingState);
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("jungle", reducingState);
 
 			// Submit the job graph
 			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -877,117 +901,24 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			// Wait until job is running
 
 			// Now query
-			long expected = numElements * (numElements + 1) / 2;
+			long expected = numElements * (numElements + 1L) / 2L;
 
 			executeQuery(deadline, client, jobId, "jungle", reducingState, expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
 						.getLeaderGateway(deadline.timeLeft())
 						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-				Await.ready(cancellation, deadline.timeLeft());
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			}
 
-			client.shutDown();
+			client.shutdown();
 		}
 	}
 
-	private static <K, V> Future<V> getKvStateWithRetries(
-			final QueryableStateClient client,
-			final JobID jobId,
-			final String queryName,
-			final K key,
-			final TypeInformation<K> keyTypeInfo,
-			final TypeSerializer<V> valueTypeSerializer,
-			final FiniteDuration retryDelay,
-			final boolean failForUnknownKeyOrNamespace) {
-
-		return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer)
-				.recoverWith(new Recover<Future<V>>() {
-					@Override
-					public Future<V> recover(Throwable failure) throws Throwable {
-						if (failure instanceof AssertionError) {
-							return Futures.failed(failure);
-						} else if (failForUnknownKeyOrNamespace &&
-								(failure instanceof UnknownKeyOrNamespace)) {
-							return Futures.failed(failure);
-						} else {
-							// At startup some failures are expected
-							// due to races. Make sure that they don't
-							// fail this test.
-							return Patterns.after(
-									retryDelay,
-									testActorSystem.scheduler(),
-									testActorSystem.dispatcher(),
-									new Callable<Future<V>>() {
-										@Override
-										public Future<V> call() throws Exception {
-											return getKvStateWithRetries(
-													client,
-													jobId,
-													queryName,
-													key,
-													keyTypeInfo,
-													valueTypeSerializer,
-													retryDelay,
-													failForUnknownKeyOrNamespace);
-										}
-									});
-						}
-					}
-				}, testActorSystem.dispatcher());
-
-	}
-
-	private static <K, V> Future<V> getKvStateWithRetries(
-			final QueryableStateClient client,
-			final JobID jobId,
-			final String queryName,
-			final K key,
-			final TypeInformation<K> keyTypeInfo,
-			final StateDescriptor<?, V> stateDescriptor,
-			final FiniteDuration retryDelay,
-			final boolean failForUnknownKeyOrNamespace) {
-
-		return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor)
-				.recoverWith(new Recover<Future<V>>() {
-					@Override
-					public Future<V> recover(Throwable failure) throws Throwable {
-						if (failure instanceof AssertionError) {
-							return Futures.failed(failure);
-						} else if (failForUnknownKeyOrNamespace &&
-								(failure instanceof UnknownKeyOrNamespace)) {
-							return Futures.failed(failure);
-						} else {
-							// At startup some failures are expected
-							// due to races. Make sure that they don't
-							// fail this test.
-							return Patterns.after(
-									retryDelay,
-									testActorSystem.scheduler(),
-									testActorSystem.dispatcher(),
-									new Callable<Future<V>>() {
-										@Override
-										public Future<V> call() throws Exception {
-											return getKvStateWithRetries(
-													client,
-													jobId,
-													queryName,
-													key,
-													keyTypeInfo,
-													stateDescriptor,
-													retryDelay,
-													failForUnknownKeyOrNamespace);
-										}
-									});
-						}
-					}
-				}, testActorSystem.dispatcher());
-	}
-
 	/**
 	 * Test source producing (key, 0)..(key, maxValue) with key being the sub
 	 * task index.
@@ -1030,7 +961,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			while (isRunning) {
 				synchronized (this) {
-					this.wait();
+					wait();
 				}
 			}
 		}
@@ -1040,7 +971,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			isRunning = false;
 
 			synchronized (this) {
-				this.notifyAll();
+				notifyAll();
 			}
 		}
 
@@ -1125,4 +1056,105 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		}
 	}
 
+	/////				General Utility Methods				//////
+
+	private static <K, V> Future<V> getKvStateWithRetries(
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final TypeSerializer<V> valueTypeSerializer,
+			final Time retryDelay,
+			final boolean failForUnknownKeyOrNamespace,
+			final ScheduledExecutor executor) {
+
+		return retryWithDelay(
+				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer),
+				NO_OF_RETRIES,
+				retryDelay,
+				executor,
+				failForUnknownKeyOrNamespace);
+	}
+
+	private static <K, V> CompletableFuture<V> getKvStateWithRetries(
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<?, V> stateDescriptor,
+			final Time retryDelay,
+			final boolean failForUnknownKeyOrNamespace,
+			final ScheduledExecutor executor) {
+		return retryWithDelay(
+				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
+				NO_OF_RETRIES,
+				retryDelay,
+				executor,
+				failForUnknownKeyOrNamespace);
+	}
+
+	private static <T> CompletableFuture<T> retryWithDelay(
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor,
+			final boolean failIfUnknownKeyOrNamespace) {
+
+		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+		retryWithDelay(
+				resultFuture,
+				operation,
+				retries,
+				retryDelay,
+				scheduledExecutor,
+				failIfUnknownKeyOrNamespace);
+
+		return resultFuture;
+	}
+
+	public static <T> void retryWithDelay(
+			final CompletableFuture<T> resultFuture,
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor,
+			final boolean failIfUnknownKeyOrNamespace) {
+
+		if (!resultFuture.isDone()) {
+			final CompletableFuture<T> operationResultFuture = operation.get();
+			operationResultFuture.whenCompleteAsync(
+					(t, throwable) -> {
+						if (throwable != null) {
+							if (throwable.getCause() instanceof CancellationException) {
+								resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
+							} else if (throwable.getCause() instanceof AssertionError ||
+									(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
+								resultFuture.completeExceptionally(throwable.getCause());
+							} else {
+								if (retries > 0) {
+									final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+											() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
+											retryDelay.toMilliseconds(),
+											TimeUnit.MILLISECONDS);
+
+									resultFuture.whenComplete(
+											(innerT, innerThrowable) -> scheduledFuture.cancel(false));
+								} else {
+									resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
+											"has been exhausted.", throwable));
+								}
+							}
+						} else {
+							resultFuture.complete(t);
+						}
+					},
+					scheduledExecutor);
+
+			resultFuture.whenComplete(
+					(t, throwable) -> operationResultFuture.cancel(false));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index 15a5ff6..a2a9678 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 
@@ -40,7 +39,7 @@ import static org.junit.Assert.fail;
 public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
 
 	private static final int NUM_JMS = 2;
-	private static final int NUM_TMS = 4;
+	private static final int NUM_TMS = 1;
 	private static final int NUM_SLOTS_PER_TM = 4;
 
 	private static TestingServer zkServer;
@@ -67,8 +66,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt
 			cluster = new TestingCluster(config, false);
 			cluster.start();
 
-			testActorSystem = AkkaUtils.createDefaultActorSystem();
-
 			// verify that we are in HA mode
 			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
 
@@ -85,9 +82,6 @@ public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableSt
 			cluster.awaitTermination();
 		}
 
-		testActorSystem.shutdown();
-		testActorSystem.awaitTermination();
-
 		try {
 			zkServer.stop();
 			zkServer.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index c52acc8..1173d0d 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 
@@ -37,7 +36,7 @@ import static org.junit.Assert.fail;
  */
 public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
 
-	private static final int NUM_TMS = 2;
+	private static final int NUM_TMS = 1;
 	private static final int NUM_SLOTS_PER_TM = 4;
 
 	@BeforeClass
@@ -48,14 +47,13 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+			config.setInteger(QueryableStateOptions.SERVER_PORT, 9069);
 			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
 			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
 
 			cluster = new TestingCluster(config, false);
 			cluster.start(true);
 
-			testActorSystem = AkkaUtils.createDefaultActorSystem();
-
 			// verify that we are not in HA mode
 			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
 
@@ -73,9 +71,5 @@ public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryabl
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-
-		if (testActorSystem != null) {
-			testActorSystem.shutdown();
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
deleted file mode 100644
index d9a41a1..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownJobManager;
-import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AkkaKvStateLocationLookupService}.
- */
-public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
-
-	/** The default timeout. */
-	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
-	/** Test actor system shared between the tests. */
-	private static ActorSystem testActorSystem;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (testActorSystem != null) {
-			testActorSystem.shutdown();
-		}
-	}
-
-	/**
-	 * Tests responses if no leader notification has been reported or leadership
-	 * has been lost (leaderAddress = <code>null</code>).
-	 */
-	@Test
-	public void testNoJobManagerRegistered() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			null,
-			null);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		//
-		// No leader registered initially => fail with UnknownJobManager
-		//
-		try {
-			JobID jobId = new JobID();
-			String name = "coffee";
-
-			Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
-
-			Await.result(locationFuture, TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (UnknownJobManager ignored) {
-			// Expected
-		}
-
-		assertEquals("Received unexpected lookup", 0, received.size());
-
-		//
-		// Leader registration => communicate with new leader
-		//
-		UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
-		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
-
-		ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
-
-		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		// Notify the service about a leader
-		leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
-
-		JobID jobId = new JobID();
-		String name = "tea";
-
-		// Verify that the leader response is handled
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
-		assertEquals(expected, location);
-
-		// Verify that the correct message was sent to the leader
-		assertEquals(1, received.size());
-
-		verifyLookupMsg(received.poll(), jobId, name);
-
-		//
-		// Leader loss => fail with UnknownJobManager
-		//
-		leaderRetrievalService.notifyListener(null, null);
-
-		try {
-			Future<KvStateLocation> locationFuture = lookupService
-					.getKvStateLookupInfo(new JobID(), "coffee");
-
-			Await.result(locationFuture, TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (UnknownJobManager ignored) {
-			// Expected
-		}
-
-		// No new messages received
-		assertEquals(0, received.size());
-	}
-
-	/**
-	 * Tests that messages are properly decorated with the leader session ID.
-	 */
-	@Test
-	public void testLeaderSessionIdChange() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			"localhost",
-			HighAvailabilityServices.DEFAULT_LEADER_ID);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		// Create test actors with random leader session IDs
-		KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
-		UUID leaderSessionId1 = UUID.randomUUID();
-		ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
-		String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
-
-		KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
-		UUID leaderSessionId2 = UUID.randomUUID();
-		ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
-		String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
-
-		JobID jobId = new JobID();
-
-		//
-		// Notify about first leader
-		//
-		leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
-
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
-		assertEquals(expected1, location);
-
-		assertEquals(1, received.size());
-		verifyLookupMsg(received.poll(), jobId, "rock");
-
-		//
-		// Notify about second leader
-		//
-		leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
-
-		location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
-		assertEquals(expected2, location);
-
-		assertEquals(1, received.size());
-		verifyLookupMsg(received.poll(), jobId, "roll");
-	}
-
-	/**
-	 * Tests that lookups are retried when no leader notification is available.
-	 */
-	@Test
-	public void testRetryOnUnknownJobManager() throws Exception {
-		final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy =
-				new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
-					@Override
-					public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
-						return retryStrategies.poll();
-					}
-				};
-
-		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			null,
-			null);
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				retryStrategy);
-
-		lookupService.start();
-
-		//
-		// Test call to retry
-		//
-		final AtomicBoolean hasRetried = new AtomicBoolean();
-		retryStrategies.add(
-				new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
-					@Override
-					public FiniteDuration getRetryDelay() {
-						return FiniteDuration.Zero();
-					}
-
-					@Override
-					public boolean tryRetry() {
-						if (hasRetried.compareAndSet(false, true)) {
-							return true;
-						}
-						return false;
-					}
-				});
-
-		Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
-
-		Await.ready(locationFuture, TIMEOUT);
-		assertTrue("Did not retry ", hasRetried.get());
-
-		//
-		// Test leader notification after retry
-		//
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
-		ActorRef testActor = LookupResponseActor.create(received, null, expected);
-		final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
-			@Override
-			public FiniteDuration getRetryDelay() {
-				return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
-			}
-
-			@Override
-			public boolean tryRetry() {
-				leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
-				return true;
-			}
-		});
-
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
-		assertEquals(expected, location);
-	}
-
-	@Test
-	public void testUnexpectedResponseType() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			"localhost",
-			HighAvailabilityServices.DEFAULT_LEADER_ID);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		// Create test actors with random leader session IDs
-		String expected = "unexpected-response-type";
-		ActorRef testActor = LookupResponseActor.create(received, null, expected);
-		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		leaderRetrievalService.notifyListener(testActorAddress, null);
-
-		try {
-			Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (Throwable ignored) {
-			// Expected
-		}
-	}
-
-	private static final class LookupResponseActor extends FlinkUntypedActor {
-
-		/** Received lookup messages. */
-		private final Queue<LookupKvStateLocation> receivedLookups;
-
-		/** Responses on KvStateMessage.LookupKvStateLocation messages. */
-		private final Queue<Object> lookupResponses;
-
-		/** The leader session ID. */
-		private UUID leaderSessionId;
-
-		public LookupResponseActor(
-				Queue<LookupKvStateLocation> receivedLookups,
-				UUID leaderSessionId, Object... lookupResponses) {
-
-			this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
-			this.leaderSessionId = leaderSessionId;
-			this.lookupResponses = new ArrayDeque<>();
-
-			if (lookupResponses != null) {
-				for (Object resp : lookupResponses) {
-					this.lookupResponses.add(resp);
-				}
-			}
-		}
-
-		@Override
-		public void handleMessage(Object message) throws Exception {
-			if (message instanceof LookupKvStateLocation) {
-				// Add to received lookups queue
-				receivedLookups.add((LookupKvStateLocation) message);
-
-				Object msg = lookupResponses.poll();
-				if (msg != null) {
-					if (msg instanceof Throwable) {
-						sender().tell(new Status.Failure((Throwable) msg), self());
-					} else {
-						sender().tell(new Status.Success(msg), self());
-					}
-				}
-			} else if (message instanceof UUID) {
-				this.leaderSessionId = (UUID) message;
-			} else {
-				LOG.debug("Received unhandled message: {}", message);
-			}
-		}
-
-		@Override
-		protected UUID getLeaderSessionID() {
-			return leaderSessionId;
-		}
-
-		private static ActorRef create(
-				Queue<LookupKvStateLocation> receivedLookups,
-				UUID leaderSessionId,
-				Object... lookupResponses) {
-
-			return testActorSystem.actorOf(Props.create(
-					LookupResponseActor.class,
-					receivedLookups,
-					leaderSessionId,
-					lookupResponses));
-		}
-	}
-
-	private static void verifyLookupMsg(
-			LookupKvStateLocation lookUpMsg,
-			JobID expectedJobId,
-			String expectedName) {
-
-		assertNotNull(lookUpMsg);
-		assertEquals(expectedJobId, lookUpMsg.getJobId());
-		assertEquals(expectedName, lookUpMsg.getRegistrationName());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
new file mode 100644
index 0000000..b6f855e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link Client}.
+ */
+public class ClientTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
+
+	// Thread pool for client bootstrap (shared between tests)
+	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (NIO_GROUP != null) {
+			NIO_GROUP.shutdownGracefully();
+		}
+	}
+
+	/**
+	 * Tests simple queries, of which half succeed and half fail.
+	 */
+	@Test
+	public void testSimpleRequests() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new Client<>("Test Client", 1, serializer, stats);
+
+			// Random result
+			final byte[] expected = new byte[1024];
+			ThreadLocalRandom.current().nextBytes(expected);
+
+			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.add((ByteBuf) msg);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			long numQueries = 1024L;
+
+			List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>();
+			for (long i = 0L; i < numQueries; i++) {
+				KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+				futures.add(client.sendRequest(serverAddress, request));
+			}
+
+			// Respond to messages
+			Exception testException = new RuntimeException("Expected test Exception");
+
+			for (long i = 0L; i < numQueries; i++) {
+				ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				assertNotNull("Receive timed out", buf);
+
+				Channel ch = channel.get();
+				assertNotNull("Channel not active", ch);
+
+				assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+				long requestId = MessageSerializer.getRequestId(buf);
+				KvStateInternalRequest deserRequest = serializer.deserializeRequest(buf);
+
+				buf.release();
+
+				if (i % 2L == 0L) {
+					ByteBuf response = MessageSerializer.serializeResponse(
+							serverChannel.alloc(),
+							requestId,
+							new KvStateResponse(expected));
+
+					ch.writeAndFlush(response);
+				} else {
+					ByteBuf response = MessageSerializer.serializeRequestFailure(
+							serverChannel.alloc(),
+							requestId,
+							testException);
+
+					ch.writeAndFlush(response);
+				}
+			}
+
+			for (long i = 0L; i < numQueries; i++) {
+
+				if (i % 2L == 0L) {
+					KvStateResponse serializedResult = futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+					assertArrayEquals(expected, serializedResult.getContent());
+				} else {
+					try {
+						futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+						fail("Did not throw expected Exception");
+					} catch (ExecutionException e) {
+
+						if (!(e.getCause() instanceof RuntimeException)) {
+							fail("Did not throw expected Exception");
+						}
+						// else expected
+					}
+				}
+			}
+
+			assertEquals(numQueries, stats.getNumRequests());
+			long expectedRequests = numQueries / 2L;
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
+					stats.getNumFailed() != expectedRequests)) {
+				Thread.sleep(100L);
+			}
+
+			assertEquals(expectedRequests, stats.getNumSuccessful());
+			assertEquals(expectedRequests, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0L, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a request to an unavailable host is failed with ConnectException.
+	 */
+	@Test
+	public void testRequestUnavailableHost() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+
+		try {
+			client = new Client<>("Test Client", 1, serializer, stats);
+
+			int availablePort = NetUtils.getAvailablePort();
+
+			KvStateServerAddress serverAddress = new KvStateServerAddress(
+					InetAddress.getLocalHost(),
+					availablePort);
+
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+			CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request);
+
+			try {
+				future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail("Did not throw expected ConnectException");
+			} catch (ExecutionException e) {
+				if (!(e.getCause() instanceof ConnectException)) {
+					fail("Did not throw expected ConnectException");
+				}
+				// else expected
+			}
+		} finally {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			assertEquals("Channel leak", 0L, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Multiple threads concurrently fire queries.
+	 */
+	@Test
+	public void testConcurrentQueries() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		ExecutorService executor = null;
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+		Channel serverChannel = null;
+
+		final byte[] serializedResult = new byte[1024];
+		ThreadLocalRandom.current().nextBytes(serializedResult);
+
+		try {
+			int numQueryTasks = 4;
+			final int numQueriesPerTask = 1024;
+
+			executor = Executors.newFixedThreadPool(numQueryTasks);
+
+			client = new Client<>("Test Client", 1, serializer, stats);
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					ByteBuf buf = (ByteBuf) msg;
+					assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+					long requestId = MessageSerializer.getRequestId(buf);
+					KvStateInternalRequest request = serializer.deserializeRequest(buf);
+
+					buf.release();
+
+					KvStateResponse response = new KvStateResponse(serializedResult);
+					ByteBuf serResponse = MessageSerializer.serializeResponse(
+							ctx.alloc(),
+							requestId,
+							response);
+
+					ctx.channel().writeAndFlush(serResponse);
+				}
+			});
+
+			final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
+			Callable<List<CompletableFuture<KvStateResponse>>> queryTask = () -> {
+				List<CompletableFuture<KvStateResponse>> results = new ArrayList<>(numQueriesPerTask);
+
+				for (int i = 0; i < numQueriesPerTask; i++) {
+					KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+					results.add(finalClient.sendRequest(serverAddress, request));
+				}
+
+				return results;
+			};
+
+			// Submit query tasks
+			List<Future<List<CompletableFuture<KvStateResponse>>>> futures = new ArrayList<>();
+			for (int i = 0; i < numQueryTasks; i++) {
+				futures.add(executor.submit(queryTask));
+			}
+
+			// Verify results
+			for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) {
+				List<CompletableFuture<KvStateResponse>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				for (CompletableFuture<KvStateResponse> result : results) {
+					KvStateResponse actual = result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+					assertArrayEquals(serializedResult, actual.getContent());
+				}
+			}
+
+			int totalQueries = numQueryTasks * numQueriesPerTask;
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
+				Thread.sleep(100L);
+			}
+
+			assertEquals(totalQueries, stats.getNumRequests());
+			assertEquals(totalQueries, stats.getNumSuccessful());
+		} finally {
+			if (executor != null) {
+				executor.shutdown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			if (client != null) {
+				client.shutdown();
+			}
+
+			assertEquals("Channel leak", 0L, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a server failure closes the connection and removes it from
+	 * the established connections.
+	 */
+	@Test
+	public void testFailureClosesChannel() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new Client<>("Test Client", 1, serializer, stats);
+
+			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.add((ByteBuf) msg);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			// Requests
+			List<Future<KvStateResponse>> futures = new ArrayList<>();
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+
+			futures.add(client.sendRequest(serverAddress, request));
+			futures.add(client.sendRequest(serverAddress, request));
+
+			ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertNotNull("Receive timed out", buf);
+			buf.release();
+
+			buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertNotNull("Receive timed out", buf);
+			buf.release();
+
+			assertEquals(1L, stats.getNumConnections());
+
+			Channel ch = channel.get();
+			assertNotNull("Channel not active", ch);
+
+			// Respond with failure
+			ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+					serverChannel.alloc(),
+					new RuntimeException("Expected test server failure")));
+
+			try {
+				futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail("Did not throw expected server failure");
+			} catch (ExecutionException e) {
+
+				if (!(e.getCause() instanceof RuntimeException)) {
+					fail("Did not throw expected Exception");
+				}
+				// Expected
+			}
+
+			try {
+				futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail("Did not throw expected server failure");
+			} catch (ExecutionException e) {
+
+				if (!(e.getCause() instanceof RuntimeException)) {
+					fail("Did not throw expected Exception");
+				}
+				// Expected
+			}
+
+			assertEquals(0L, stats.getNumConnections());
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
+				Thread.sleep(100L);
+			}
+
+			assertEquals(2L, stats.getNumRequests());
+			assertEquals(0L, stats.getNumSuccessful());
+			assertEquals(2L, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0L, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a server channel close, closes the connection and removes it
+	 * from the established connections.
+	 */
+	@Test
+	public void testServerClosesChannel() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new Client<>("Test Client", 1, serializer, stats);
+
+			final AtomicBoolean received = new AtomicBoolean();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.set(true);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			// Requests
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+			Future<KvStateResponse> future = client.sendRequest(serverAddress, request);
+
+			while (!received.get() && deadline.hasTimeLeft()) {
+				Thread.sleep(50L);
+			}
+			assertTrue("Receive timed out", received.get());
+
+			assertEquals(1, stats.getNumConnections());
+
+			channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+			try {
+				future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				fail("Did not throw expected server failure");
+			} catch (ExecutionException e) {
+				if (!(e.getCause() instanceof ClosedChannelException)) {
+					fail("Did not throw expected Exception");
+				}
+				// Expected
+			}
+
+			assertEquals(0L, stats.getNumConnections());
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
+				Thread.sleep(100L);
+			}
+
+			assertEquals(1L, stats.getNumRequests());
+			assertEquals(0L, stats.getNumSuccessful());
+			assertEquals(1L, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0L, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests multiple clients querying multiple servers until 100k queries have
+	 * been processed. At this point, the client is shut down and its verified
+	 * that all ongoing requests are failed.
+	 */
+	@Test
+	public void testClientServerIntegration() throws Exception {
+		// Config
+		final int numServers = 2;
+		final int numServerEventLoopThreads = 2;
+		final int numServerQueryThreads = 2;
+
+		final int numClientEventLoopThreads = 4;
+		final int numClientsTasks = 8;
+
+		final int batchSize = 16;
+
+		final int numKeyGroups = 1;
+
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		KvStateRegistry dummyRegistry = new KvStateRegistry();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(dummyRegistry);
+
+		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+		AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		Client<KvStateInternalRequest, KvStateResponse> client = null;
+		ExecutorService clientTaskExecutor = null;
+		final KvStateServerImpl[] server = new KvStateServerImpl[numServers];
+
+		try {
+			client = new Client<>("Test Client", numClientEventLoopThreads, serializer, clientStats);
+			clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
+
+			// Create state
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+			desc.setQueryable("any");
+
+			// Create servers
+			KvStateRegistry[] registry = new KvStateRegistry[numServers];
+			AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+			final KvStateID[] ids = new KvStateID[numServers];
+
+			for (int i = 0; i < numServers; i++) {
+				registry[i] = new KvStateRegistry();
+				serverStats[i] = new AtomicKvStateRequestStats();
+				server[i] = new KvStateServerImpl(
+						InetAddress.getLocalHost(),
+						0,
+						numServerEventLoopThreads,
+						numServerQueryThreads,
+						registry[i],
+						serverStats[i]);
+
+				server[i].start();
+
+				backend.setCurrentKey(1010 + i);
+
+				// Value per server
+				ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE,
+						desc);
+
+				state.update(201 + i);
+
+				// we know it must be a KvStat but this is not exposed to the user via State
+				InternalKvState<?> kvState = (InternalKvState<?>) state;
+
+				// Register KvState (one state instance for all server)
+				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+			}
+
+			final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
+			Callable<Void> queryTask = () -> {
+				while (true) {
+					if (Thread.interrupted()) {
+						throw new InterruptedException();
+					}
+
+					// Random server permutation
+					List<Integer> random = new ArrayList<>();
+					for (int j = 0; j < batchSize; j++) {
+						random.add(j);
+					}
+					Collections.shuffle(random);
+
+					// Dispatch queries
+					List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize);
+
+					for (int j = 0; j < batchSize; j++) {
+						int targetServer = random.get(j) % numServers;
+
+						byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+								1010 + targetServer,
+								IntSerializer.INSTANCE,
+								VoidNamespace.INSTANCE,
+								VoidNamespaceSerializer.INSTANCE);
+
+						KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+						futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), request));
+					}
+
+					// Verify results
+					for (int j = 0; j < batchSize; j++) {
+						int targetServer = random.get(j) % numServers;
+
+						Future<KvStateResponse> future = futures.get(j);
+						byte[] buf = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent();
+						int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+						assertEquals(201L + targetServer, value);
+					}
+				}
+			};
+
+			// Submit tasks
+			List<Future<Void>> taskFutures = new ArrayList<>();
+			for (int i = 0; i < numClientsTasks; i++) {
+				taskFutures.add(clientTaskExecutor.submit(queryTask));
+			}
+
+			long numRequests;
+			while ((numRequests = clientStats.getNumRequests()) < 100_000L) {
+				Thread.sleep(100L);
+				LOG.info("Number of requests {}/100_000", numRequests);
+			}
+
+			// Shut down
+			client.shutdown();
+
+			for (Future<Void> future : taskFutures) {
+				try {
+					future.get();
+					fail("Did not throw expected Exception after shut down");
+				} catch (ExecutionException t) {
+					if (t.getCause().getCause() instanceof ClosedChannelException ||
+							t.getCause().getCause() instanceof IllegalStateException) {
+						// Expected
+					} else {
+						t.printStackTrace();
+						fail("Failed with unexpected Exception type: " + t.getClass().getName());
+					}
+				}
+			}
+
+			assertEquals("Connection leak (client)", 0L, clientStats.getNumConnections());
+			for (int i = 0; i < numServers; i++) {
+				boolean success = false;
+				int numRetries = 0;
+				while (!success) {
+					try {
+						assertEquals("Connection leak (server)", 0L, serverStats[i].getNumConnections());
+						success = true;
+					} catch (Throwable t) {
+						if (numRetries < 10) {
+							LOG.info("Retrying connection leak check (server)");
+							Thread.sleep((numRetries + 1) * 50L);
+							numRetries++;
+						} else {
+							throw t;
+						}
+					}
+				}
+			}
+		} finally {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			for (int i = 0; i < numServers; i++) {
+				if (server[i] != null) {
+					server[i].shutdown();
+				}
+			}
+
+			if (clientTaskExecutor != null) {
+				clientTaskExecutor.shutdown();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
+		ServerBootstrap bootstrap = new ServerBootstrap()
+				// Bind address and port
+				.localAddress(InetAddress.getLocalHost(), 0)
+				// NIO server channels
+				.group(NIO_GROUP)
+				.channel(NioServerSocketChannel.class)
+				// See initializer for pipeline details
+				.childHandler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					protected void initChannel(SocketChannel ch) throws Exception {
+						ch.pipeline()
+								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+								.addLast(handlers);
+					}
+				});
+
+		return bootstrap.bind().sync().channel();
+	}
+
+	private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
+		InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
+
+		return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
index 0b97bda..cb490aa 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.queryablestate.network;
 
-import org.apache.flink.queryablestate.client.KvStateClientHandler;
-import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 /**
- * Tests for {@link KvStateClientHandler}.
+ * Tests for {@link ClientHandler}.
  */
 public class KvStateClientHandlerTest {
 
@@ -47,28 +47,30 @@ public class KvStateClientHandlerTest {
 	 */
 	@Test
 	public void testReadCallbacksAndBufferRecycling() throws Exception {
-		KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class);
+		final ClientHandlerCallback<KvStateResponse> callback = mock(ClientHandlerCallback.class);
 
-		EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback));
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+		final EmbeddedChannel channel = new EmbeddedChannel(new ClientHandler<>("Test Client", serializer, callback));
+
+		final byte[] content = new byte[0];
+		final KvStateResponse response = new KvStateResponse(content);
 
 		//
 		// Request success
 		//
-		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-				channel.alloc(),
-				1222112277,
-				new byte[0]);
+		ByteBuf buf = MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify callback
 		channel.writeInbound(buf);
-		verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class));
+		verify(callback, times(1)).onRequestResult(eq(1222112277L), any(KvStateResponse.class));
 		assertEquals("Buffer not recycled", 0, buf.refCnt());
 
 		//
 		// Request failure
 		//
-		buf = MessageSerializer.serializeKvStateRequestFailure(
+		buf = MessageSerializer.serializeRequestFailure(
 				channel.alloc(),
 				1222112278,
 				new RuntimeException("Expected test Exception"));