You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:48 UTC

[1/5] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.

Repository: flink
Updated Branches:
  refs/heads/master a0838de79 -> a4d869759


[FLINK-8062][QS] Make getKvState() with namespace private.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff7e3cf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff7e3cf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff7e3cf6

Branch: refs/heads/master
Commit: ff7e3cf6749a6b6bc898fde871c36661c8629c23
Parents: a0838de
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:08 2017 +0100

----------------------------------------------------------------------
 .../flink/queryablestate/client/QueryableStateClient.java     | 3 +--
 .../itcases/AbstractQueryableStateTestBase.java               | 7 +------
 2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
 	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
 	 * @return Future holding the immutable {@link State} object containing the result.
 	 */
-	@PublicEvolving
-	public <K, N, S extends State, V> CompletableFuture<S> getKvState(
+	private <K, N, S extends State, V> CompletableFuture<S> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,

http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
 import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					jobId,
 					"wrong-hankuna", // this is the wrong name.
 					0,
-					VoidNamespace.INSTANCE,
 					BasicTypeInfo.INT_TYPE_INFO,
-					VoidNamespaceTypeInfo.INSTANCE,
 					valueState);
 
 			try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					jobId,
 					queryableState.getQueryableStateName(),
 					0,
-					VoidNamespace.INSTANCE,
 					BasicTypeInfo.INT_TYPE_INFO,
-					VoidNamespaceTypeInfo.INSTANCE,
 					valueState);
 
 			cluster.submitJobDetached(jobGraph);
@@ -1486,7 +1481,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 		if (!resultFuture.isDone()) {
 			Thread.sleep(100L);
-			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
 			expected.whenCompleteAsync((result, throwable) -> {
 				if (throwable != null) {
 					if (


[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.

Posted by kk...@apache.org.
[FLINK-8055][QS] Deduplicate logging messages about QS start.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96

Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerBase.java             | 20 ++++++++++----------
 .../flink/queryablestate/network/Client.java    | 20 ++++++++++++++------
 .../server/KvStateServerImpl.java               |  5 -----
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../network/KvStateServerHandlerTest.java       |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** AbstractServerBase config: low water mark. */
 	private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 */
 	public void start() throws Throwable {
 		Preconditions.checkState(serverAddress == null,
-				"The " + serverName + " already running @ " + serverAddress + '.');
+				serverName + " is already running @ " + serverAddress + '.');
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
 
 		if (serverAddress != null) {
-			LOG.info("Started the {} @ {}.", serverName, serverAddress);
+			log.info("Started {} @ {}.", serverName, serverAddress);
 		} else {
-			LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
-			throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+			log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+			throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
 		}
 	}
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * @throws Exception If something goes wrong during the bind operation.
 	 */
 	private boolean attemptToBind(final int port) throws Throwable {
-		LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+		log.debug("Attempting to start {} on port {}.", serverName, port);
 
 		this.queryExecutor = createQueryExecutor();
 		this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 
 			throw future.cause();
 		} catch (BindException e) {
-			LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
 			shutdown();
 		}
 		// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * Shuts down the server and all related thread pools.
 	 */
 	public void shutdown() {
-		LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+		log.info("Shutting down {} @ {}", serverName, serverAddress);
 
 		if (handler != null) {
 			handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	}
 
 	@VisibleForTesting
-	public boolean isExecutorShutdown() {
-		return queryExecutor.isShutdown();
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					while (!queuedRequests.isEmpty()) {
 						final PendingRequest pending = queuedRequests.poll();
 
-						established.sendRequest(pending.request)
-								.thenAccept(resp -> pending.complete(resp))
-								.exceptionally(throwable -> {
-									pending.completeExceptionally(throwable);
-									return null;
-						});
+						established.sendRequest(pending.request).whenComplete(
+								(response, throwable) -> {
+									if (throwable != null) {
+										pending.completeExceptionally(throwable);
+									} else {
+										pending.complete(response);
+									}
+								});
 					}
 
 					// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 			}
 		}
 	}
+
+	@VisibleForTesting
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
 @Internal
 public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
 
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
 	/** The {@link KvStateRegistry} to query for state instances. */
 	private final KvStateRegistry kvStateRegistry;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 			cluster = new TestingCluster(config, false);
-			cluster.start();
+			cluster.start(true);
 
 			client = new QueryableStateClient("localhost", proxyPortRangeStart);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
 		// the expected exception along with the adequate message
 		expectedEx.expect(FlinkRuntimeException.class);
-		expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+		expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
 
 		TestServer server1 = null;
 		TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		localTestServer.start();
 		localTestServer.shutdown();
-		assertTrue(localTestServer.isExecutorShutdown());
+		assertTrue(localTestServer.getQueryExecutor().isTerminated());
 
 		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
 				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
 			if (kvStateServer != null) {
 				try {
 					kvStateServer.start();
-					LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
 			if (kvStateProxy != null) {
 				try {
 					kvStateProxy.start();
-					LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateProxy.shutdown();
 					kvStateProxy = null;


[5/5] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()

Posted by kk...@apache.org.
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d86975
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d86975
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d86975

Branch: refs/heads/master
Commit: a4d86975967942054d1bd466641e9c835fb014ac
Parents: 2fe078f
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:11 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateRegistry.java    | 23 ++++++--------------
 1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4d86975/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
 			new ConcurrentHashMap<>();
 
 	/** Registry listener to be notified on registration/unregistration. */
-	private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+	private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
 
 	/**
 	 * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
 	 * @throws IllegalStateException If there is a registered listener
 	 */
 	public void registerListener(KvStateRegistryListener listener) {
-		if (!this.listener.compareAndSet(null, listener)) {
+		if (!listenerRef.compareAndSet(null, listener)) {
 			throw new IllegalStateException("Listener already registered.");
 		}
 	}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
 	 * Unregisters the listener with the registry.
 	 */
 	public void unregisterListener() {
-		listener.set(null);
+		listenerRef.set(null);
 	}
 
 	/**
-	 * Registers the KvState instance identified by the given 4-tuple of JobID,
-	 * JobVertexID, key group index, and registration name.
-	 *
-	 * @param kvStateId KvStateID to identify the KvState instance
-	 * @param kvState   KvState instance to register
-	 * @throws IllegalStateException If there is a KvState instance registered
-	 *                               with the same ID.
-	 */
-
-	/**
 	 * Registers the KvState instance and returns the assigned ID.
 	 *
 	 * @param jobId            JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
 		KvStateID kvStateId = new KvStateID();
 
 		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateRegistered(
 						jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
 			return kvStateId;
 		} else {
-			throw new IllegalStateException(kvStateId + " is already registered.");
+			throw new IllegalStateException(
+					"State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
 		}
 	}
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
 			KvStateID kvStateId) {
 
 		if (registeredKvStates.remove(kvStateId) != null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
 						jobId,


[4/5] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds

Posted by kk...@apache.org.
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fe078f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fe078f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fe078f3

Branch: refs/heads/master
Commit: 2fe078f3927595cbc3c5de6635a710494e0f34b4
Parents: 5e059e9
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:10 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/JobManagerTest.java      |  5 +--
 3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that duplicate query registrations fail the job at the JobManager.
-	 *
-	 * <b>NOTE: </b> This test is only in the non-HA variant of the tests because
-	 * in the HA mode we use the actual JM code which does not recognize the
-	 * {@code NotifyWhenJobStatus} message.
 	 */
 	@Test
 	public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
 	/**
 	 * Tests that the correct exception is thrown if the query
-	 * contains a wrong queryable state name.
+	 * contains a wrong jobId or wrong queryable state name.
 	 */
 	@Test
-	public void testWrongQueryableStateName() throws Exception {
+	public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
 		// Config
 		final Deadline deadline = TEST_TIMEOUT.fromNow();
 
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 			assertEquals(JobStatus.RUNNING, jobStatus.state());
 
-			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
+			final JobID wrongJobId = new JobID();
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
+					wrongJobId, 						// this is the wrong job id
+					"hankuna",
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					valueState);
+
+			try {
+				unknownJobFuture.get();
+				fail(); // by now the job must have failed.
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause() instanceof RuntimeException);
+				Assert.assertTrue(e.getCause().getMessage().contains(
+						"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
+			} catch (Exception ignored) {
+				fail("Unexpected type of exception.");
+			}
+
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
 					jobId,
 					"wrong-hankuna", // this is the wrong name.
 					0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 					valueState);
 
 			try {
-				future.get();
+				unknownQSName.get();
 				fail(); // by now the job must have failed.
 			} catch (ExecutionException e) {
 				Assert.assertTrue(e.getCause() instanceof RuntimeException);

http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95a3fd5..c12db23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
             }
 
           case None =>
-            sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+            sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
         }
 
       // TaskManager KvState registration

http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}
 
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
 		try {
 			Await.result(lookupFuture, deadline.timeLeft());
 			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
+		} catch (FlinkJobNotFoundException ignored) {
 			// Expected
 		}
 


[3/5] flink git commit: [FLINK-8065][QS] Improve error message when client already shut down.

Posted by kk...@apache.org.
[FLINK-8065][QS] Improve error message when client already shut down.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75c14541
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75c14541
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75c14541

Branch: refs/heads/master
Commit: 75c14541fdc52d5446b179e8e660b8a4fd90310c
Parents: ff7e3cf
Author: kkloudas <kk...@gmail.com>
Authored: Wed Nov 15 15:38:36 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/queryablestate/network/Client.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75c14541/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 13d34fb..e21145b 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -133,7 +133,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 
 	public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress, final REQ request) {
 		if (shutDown.get()) {
-			return FutureUtils.getFailedFuture(new IllegalStateException("Shut down"));
+			return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down."));
 		}
 
 		EstablishedConnection connection = establishedConnections.get(serverAddress);