You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 11:03:19 UTC

[1/6] flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.

Repository: flink
Updated Branches:
  refs/heads/release-1.4 42e24413b -> 3753ae251


[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0324e34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0324e34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0324e34

Branch: refs/heads/release-1.4
Commit: d0324e34a06e7374179d1627a4a3653d07f1c614
Parents: 42e2441
Author: kkloudas <kk...@gmail.com>
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:37:18 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerHandler.java          |   2 +-
 .../client/proxy/KvStateClientProxyHandler.java |  11 +-
 .../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++-------
 3 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 					try {
 						stats.reportFailedRequest();
 
-						final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+						final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t);
 						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
 						ctx.writeAndFlush(err);
 					} catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 			operationFuture.whenCompleteAsync(
 					(t, throwable) -> {
 						if (throwable != null) {
-							if (throwable instanceof CancellationException) {
-								result.completeExceptionally(throwable);
-							} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+							if (
+									throwable.getCause() instanceof UnknownKvStateIdException ||
 									throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-									throwable.getCause() instanceof UnknownKvStateLocation ||
-									throwable.getCause() instanceof ConnectException) {
+									throwable.getCause() instanceof ConnectException
+								) {
 
 								// These failures are likely to be caused by out-of-sync
 								// KvStateLocation. Therefore we retry this query and

http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index b4bae9c..c1cbb61 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Supplier;
 
 import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Base class for queryable state integration tests with a configurable state backend.
  */
 public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
-	private static final int NO_OF_RETRIES = 100;
 	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
-	private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
 
 	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
 	private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 						allNonZero = false;
 					}
 
-					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
+							deadline,
 							client,
 							jobId,
 							queryName,
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							reducingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	 *
 	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
 	 * in the HA mode we use the actual JM code which does not recognize the
-	 * {@code NotifyWhenJobStatus} message.	 *
+	 * {@code NotifyWhenJobStatus} message.
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 	}
 
 	/**
+	 * Tests that the correct exception is thrown if the query
+	 * contains a wrong queryable state name.
+	 */
+	@Test
+	public void testWrongQueryableStateName() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+					new ValueStateDescriptor<>("any", source.getType());
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 7662520075515707428L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava(
+					cluster.getLeaderGateway(deadline.timeLeft())
+							.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft())
+							.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+			cluster.submitJobDetached(jobGraph);
+
+			// expect for the job to be running
+			TestingJobManagerMessages.JobStatusIs jobStatus =
+					runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertEquals(JobStatus.RUNNING, jobStatus.state());
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+					jobId,
+					"wrong-hankuna", // this is the wrong name.
+					0,
+					VoidNamespace.INSTANCE,
+					BasicTypeInfo.INT_TYPE_INFO,
+					VoidNamespaceTypeInfo.INSTANCE,
+					valueState);
+
+			try {
+				future.get();
+				fail(); // by now the job must have failed.
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause() instanceof RuntimeException);
+				Assert.assertTrue(e.getCause().getMessage().contains(
+						"UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
+			} catch (Exception ignored) {
+				fail("Unexpected type of exception.");
+			}
+
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
+	/**
 	 * Similar tests as {@link #testValueState()} but before submitting the
 	 * job, we already issue one request which fails.
 	 */
@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 			// Now query
 			int key = 0;
-			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+					deadline,
 					client,
 					jobId,
 					queryableState.getQueryableStateName(),
 					key,
 					BasicTypeInfo.INT_TYPE_INFO,
 					valueState,
-					QUERY_RETRY_DELAY,
 					true,
 					executor);
 
@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"pumba",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							foldingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"jungle",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							reducingState,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"timon-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							mapStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+					final CompletableFuture<ListState<Long>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"list-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							listStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState(
+							deadline,
 							client,
 							jobId,
 							"aggr-queryable",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							aggrStateDescriptor,
-							QUERY_RETRY_DELAY,
 							false,
 							executor);
 
@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/////				General Utility Methods				//////
 
-	private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
+	private static <K, S extends State, V> CompletableFuture<S> getKvState(
+			final Deadline deadline,
 			final QueryableStateClient client,
 			final JobID jobId,
 			final String queryName,
 			final K key,
 			final TypeInformation<K> keyTypeInfo,
 			final StateDescriptor<S, V> stateDescriptor,
-			final Time retryDelay,
 			final boolean failForUnknownKeyOrNamespace,
-			final ScheduledExecutor executor) {
-		return retryWithDelay(
-				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
-				NO_OF_RETRIES,
-				retryDelay,
-				executor,
-				failForUnknownKeyOrNamespace);
-	}
-
-	private static <T> CompletableFuture<T> retryWithDelay(
-			final Supplier<CompletableFuture<T>> operation,
-			final int retries,
-			final Time retryDelay,
-			final ScheduledExecutor scheduledExecutor,
-			final boolean failIfUnknownKeyOrNamespace) {
-
-		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
-
-		retryWithDelay(
-				resultFuture,
-				operation,
-				retries,
-				retryDelay,
-				scheduledExecutor,
-				failIfUnknownKeyOrNamespace);
+			final ScheduledExecutor executor) throws InterruptedException {
 
+		final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+		getKvStateIgnoringCertainExceptions(
+				deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+				stateDescriptor, failForUnknownKeyOrNamespace, executor);
 		return resultFuture;
 	}
 
-	public static <T> void retryWithDelay(
-			final CompletableFuture<T> resultFuture,
-			final Supplier<CompletableFuture<T>> operation,
-			final int retries,
-			final Time retryDelay,
-			final ScheduledExecutor scheduledExecutor,
-			final boolean failIfUnknownKeyOrNamespace) {
+	private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
+			final Deadline deadline,
+			final CompletableFuture<S> resultFuture,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<S, V> stateDescriptor,
+			final boolean failForUnknownKeyOrNamespace,
+			final ScheduledExecutor executor) throws InterruptedException {
 
 		if (!resultFuture.isDone()) {
-			final CompletableFuture<T> operationResultFuture = operation.get();
-			operationResultFuture.whenCompleteAsync(
-					(t, throwable) -> {
-						if (throwable != null) {
-							if (throwable.getCause() instanceof CancellationException) {
-								resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause()));
-							} else if (throwable.getCause() instanceof AssertionError ||
-									(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) {
-								resultFuture.completeExceptionally(throwable.getCause());
-							} else {
-								if (retries > 0) {
-									final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
-											() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace),
-											retryDelay.toMilliseconds(),
-											TimeUnit.MILLISECONDS);
-
-									resultFuture.whenComplete(
-											(innerT, innerThrowable) -> scheduledFuture.cancel(false));
-								} else {
-									resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " +
-											"has been exhausted.", throwable));
-								}
-							}
-						} else {
-							resultFuture.complete(t);
+			Thread.sleep(100L);
+			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+			expected.whenCompleteAsync((result, throwable) -> {
+				if (throwable != null) {
+					if (
+							throwable.getCause() instanceof CancellationException ||
+							throwable.getCause() instanceof AssertionError ||
+							(failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
+					) {
+						resultFuture.completeExceptionally(throwable.getCause());
+					} else if (deadline.hasTimeLeft()) {
+						try {
+							getKvStateIgnoringCertainExceptions(
+									deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+									stateDescriptor, failForUnknownKeyOrNamespace, executor);
+						} catch (InterruptedException e) {
+							e.printStackTrace();
 						}
-					},
-					scheduledExecutor);
+					}
+				} else {
+					resultFuture.complete(result);
+				}
+			}, executor);
 
-			resultFuture.whenComplete(
-					(t, throwable) -> operationResultFuture.cancel(false));
+			resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
 		}
 	}
 
@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 		for (int key = 0; key < maxParallelism; key++) {
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
-				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState(
+						deadline,
 						client,
 						jobId,
 						queryableStateName,
 						key,
 						BasicTypeInfo.INT_TYPE_INFO,
 						stateDescriptor,
-						QUERY_RETRY_DELAY,
 						false,
 						executor);
 


[6/6] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().

Posted by kk...@apache.org.
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25

Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:21:18 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateRegistry.java    | 23 ++++++--------------
 1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
 			new ConcurrentHashMap<>();
 
 	/** Registry listener to be notified on registration/unregistration. */
-	private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+	private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
 
 	/**
 	 * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
 	 * @throws IllegalStateException If there is a registered listener
 	 */
 	public void registerListener(KvStateRegistryListener listener) {
-		if (!this.listener.compareAndSet(null, listener)) {
+		if (!listenerRef.compareAndSet(null, listener)) {
 			throw new IllegalStateException("Listener already registered.");
 		}
 	}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
 	 * Unregisters the listener with the registry.
 	 */
 	public void unregisterListener() {
-		listener.set(null);
+		listenerRef.set(null);
 	}
 
 	/**
-	 * Registers the KvState instance identified by the given 4-tuple of JobID,
-	 * JobVertexID, key group index, and registration name.
-	 *
-	 * @param kvStateId KvStateID to identify the KvState instance
-	 * @param kvState   KvState instance to register
-	 * @throws IllegalStateException If there is a KvState instance registered
-	 *                               with the same ID.
-	 */
-
-	/**
 	 * Registers the KvState instance and returns the assigned ID.
 	 *
 	 * @param jobId            JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
 		KvStateID kvStateId = new KvStateID();
 
 		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateRegistered(
 						jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
 			return kvStateId;
 		} else {
-			throw new IllegalStateException(kvStateId + " is already registered.");
+			throw new IllegalStateException(
+					"State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
 		}
 	}
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
 			KvStateID kvStateId) {
 
 		if (registeredKvStates.remove(kvStateId) != null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
 						jobId,


[5/6] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.

Posted by kk...@apache.org.
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752

Branch: refs/heads/release-1.4
Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c
Parents: 12b0c58
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:20:55 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/JobManagerTest.java      |  5 +--
 3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that duplicate query registrations fail the job at the JobManager.
-	 *
-	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
-	 * in the HA mode we use the actual JM code which does not recognize the
-	 * {@code NotifyWhenJobStatus} message.
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that the correct exception is thrown if the query
-	 * contains a wrong queryable state name.
+	 * contains a wrong jobId or wrong queryable state name.
 	 */
 	@Test
-	public void testWrongQueryableStateName() throws Exception {
+	public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			assertEquals(JobStatus.RUNNING, jobStatus.state());
 
-			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+			final JobID wrongJobId = new JobID();
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+					wrongJobId, 						// this is the wrong job id
+					"hankuna",
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					valueState);
+
+			try {
+				unknownJobFuture.get();
+				fail(); // by now the job must have failed.
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause() instanceof RuntimeException);
+				Assert.assertTrue(e.getCause().getMessage().contains(
+						"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+			} catch (Exception ignored) {
+				fail("Unexpected type of exception.");
+			}
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
 					jobId,
 					"wrong-hankuna", // this is the wrong name.
 					0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					valueState);
 
 			try {
-				future.get();
+				unknownQSName.get();
 				fail(); // by now the job must have failed.
 			} catch (ExecutionException e) {
 				Assert.assertTrue(e.getCause() instanceof RuntimeException);

http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4fb1196..f57637a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
             }
 
           case None =>
-            sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+            sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
         }
 
       // TaskManager KvState registration

http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}
 
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}
 


[2/6] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.

Posted by kk...@apache.org.
[FLINK-8062][QS] Make getKvState() with namespace private.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96b350ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96b350ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96b350ad

Branch: refs/heads/release-1.4
Commit: 96b350ad91a1f248d0a3c616d1e59638013892be
Parents: d0324e3
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:18:47 2017 +0100

----------------------------------------------------------------------
 .../flink/queryablestate/client/QueryableStateClient.java     | 3 +--
 .../itcases/AbstractQueryableStateTestBase.java               | 7 +------
 2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
 	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
 	 * @return Future holding the immutable {@link State} object containing the result.
 	 */
-	@PublicEvolving
-	public <K, N, S extends State, V> CompletableFuture<S> getKvState(
+	private <K, N, S extends State, V> CompletableFuture<S> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,

http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
 import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					jobId,
 					"wrong-hankuna", // this is the wrong name.
 					0,
-					VoidNamespace.INSTANCE,
 					BasicTypeInfo.INT_TYPE_INFO,
-					VoidNamespaceTypeInfo.INSTANCE,
 					valueState);
 
 			try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					jobId,
 					queryableState.getQueryableStateName(),
 					0,
-					VoidNamespace.INSTANCE,
 					BasicTypeInfo.INT_TYPE_INFO,
-					VoidNamespaceTypeInfo.INSTANCE,
 					valueState);
 
 			cluster.submitJobDetached(jobGraph);
@@ -1486,7 +1481,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		if (!resultFuture.isDone()) {
 			Thread.sleep(100L);
-			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
 			expected.whenCompleteAsync((result, throwable) -> {
 				if (throwable != null) {
 					if (


[3/6] flink git commit: [FLINK-8065][QS] Improve error message when client already shut down.

Posted by kk...@apache.org.
[FLINK-8065][QS] Improve error message when client already shut down.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6314e486
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6314e486
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6314e486

Branch: refs/heads/release-1.4
Commit: 6314e4861df3100e8edd666f00e062c128f6e09f
Parents: 96b350a
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:38:36 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:19:08 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/queryablestate/network/Client.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6314e486/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 13d34fb..e21145b 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -133,7 +133,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 
 	public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
 		if (shutDown.get()) {
-			return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
+			return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
 		}
 
 		EstablishedConnection connection = establishedConnections.get(serverAddress);


[4/6] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.

Posted by kk...@apache.org.
[FLINK-8055][QS] Deduplicate logging messages about QS start.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b0c58f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b0c58f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b0c58f

Branch: refs/heads/release-1.4
Commit: 12b0c58f6780376ac2da0f02c6e7eb8a24ab8a13
Parents: 6314e48
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:19:35 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerBase.java             | 20 ++++++++++----------
 .../flink/queryablestate/network/Client.java    | 20 ++++++++++++++------
 .../server/KvStateServerImpl.java               |  5 -----
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../network/KvStateServerHandlerTest.java       |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** AbstractServerBase config: low water mark. */
 	private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 */
 	public void start() throws Throwable {
 		Preconditions.checkState(serverAddress == null,
-				"The " + serverName + " already running @ " + serverAddress + '.');
+				serverName + " is already running @ " + serverAddress + '.');
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
 
 		if (serverAddress != null) {
-			LOG.info("Started the {} @ {}.", serverName, serverAddress);
+			log.info("Started {} @ {}.", serverName, serverAddress);
 		} else {
-			LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
-			throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+			log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+			throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
 		}
 	}
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * @throws Exception If something goes wrong during the bind operation.
 	 */
 	private boolean attemptToBind(final int port) throws Throwable {
-		LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+		log.debug("Attempting to start {} on port {}.", serverName, port);
 
 		this.queryExecutor = createQueryExecutor();
 		this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 
 			throw future.cause();
 		} catch (BindException e) {
-			LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
 			shutdown();
 		}
 		// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * Shuts down the server and all related thread pools.
 	 */
 	public void shutdown() {
-		LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+		log.info("Shutting down {} @ {}", serverName, serverAddress);
 
 		if (handler != null) {
 			handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	}
 
 	@VisibleForTesting
-	public boolean isExecutorShutdown() {
-		return queryExecutor.isShutdown();
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					while (!queuedRequests.isEmpty()) {
 						final PendingRequest pending = queuedRequests.poll();
 
-						established.sendRequest(pending.request)
-								.thenAccept(resp -> pending.complete(resp))
-								.exceptionally(throwable -> {
-									pending.completeExceptionally(throwable);
-									return null;
-						});
+						established.sendRequest(pending.request).whenComplete(
+								(response, throwable) -> {
+									if (throwable != null) {
+										pending.completeExceptionally(throwable);
+									} else {
+										pending.complete(response);
+									}
+								});
 					}
 
 					// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 			}
 		}
 	}
+
+	@VisibleForTesting
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
 @Internal
 public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
 
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
 	/** The {@link KvStateRegistry} to query for state instances. */
 	private final KvStateRegistry kvStateRegistry;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 			cluster = new TestingCluster(config, false);
-			cluster.start();
+			cluster.start(true);
 
 			client = new QueryableStateClient("localhost", proxyPortRangeStart);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
 		// the expected exception along with the adequate message
 		expectedEx.expect(FlinkRuntimeException.class);
-		expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+		expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
 
 		TestServer server1 = null;
 		TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		localTestServer.start();
 		localTestServer.shutdown();
-		assertTrue(localTestServer.isExecutorShutdown());
+		assertTrue(localTestServer.getQueryExecutor().isTerminated());
 
 		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
 				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
 			if (kvStateServer != null) {
 				try {
 					kvStateServer.start();
-					LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
 			if (kvStateProxy != null) {
 				try {
 					kvStateProxy.start();
-					LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateProxy.shutdown();
 					kvStateProxy = null;