You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:49 UTC

[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.

[FLINK-8055][QS] Deduplicate logging messages about QS start.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96

Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerBase.java             | 20 ++++++++++----------
 .../flink/queryablestate/network/Client.java    | 20 ++++++++++++++------
 .../server/KvStateServerImpl.java               |  5 -----
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../network/KvStateServerHandlerTest.java       |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** AbstractServerBase config: low water mark. */
 	private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 */
 	public void start() throws Throwable {
 		Preconditions.checkState(serverAddress == null,
-				"The " + serverName + " already running @ " + serverAddress + '.');
+				serverName + " is already running @ " + serverAddress + '.');
 
 		Iterator<Integer> portIterator = bindPortRange.iterator();
 		while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
 
 		if (serverAddress != null) {
-			LOG.info("Started the {} @ {}.", serverName, serverAddress);
+			log.info("Started {} @ {}.", serverName, serverAddress);
 		} else {
-			LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
-			throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+			log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+			throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
 		}
 	}
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * @throws Exception If something goes wrong during the bind operation.
 	 */
 	private boolean attemptToBind(final int port) throws Throwable {
-		LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+		log.debug("Attempting to start {} on port {}.", serverName, port);
 
 		this.queryExecutor = createQueryExecutor();
 		this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 
 			throw future.cause();
 		} catch (BindException e) {
-			LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
 			shutdown();
 		}
 		// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	 * Shuts down the server and all related thread pools.
 	 */
 	public void shutdown() {
-		LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+		log.info("Shutting down {} @ {}", serverName, serverAddress);
 
 		if (handler != null) {
 			handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 	}
 
 	@VisibleForTesting
-	public boolean isExecutorShutdown() {
-		return queryExecutor.isShutdown();
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 					while (!queuedRequests.isEmpty()) {
 						final PendingRequest pending = queuedRequests.poll();
 
-						established.sendRequest(pending.request)
-								.thenAccept(resp -> pending.complete(resp))
-								.exceptionally(throwable -> {
-									pending.completeExceptionally(throwable);
-									return null;
-						});
+						established.sendRequest(pending.request).whenComplete(
+								(response, throwable) -> {
+									if (throwable != null) {
+										pending.completeExceptionally(throwable);
+									} else {
+										pending.complete(response);
+									}
+								});
 					}
 
 					// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 			}
 		}
 	}
+
+	@VisibleForTesting
+	public boolean isEventGroupShutdown() {
+		return bootstrap == null || bootstrap.group().isTerminated();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
 @Internal
 public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
 
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
 	/** The {@link KvStateRegistry} to query for state instances. */
 	private final KvStateRegistry kvStateRegistry;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 			cluster = new TestingCluster(config, false);
-			cluster.start();
+			cluster.start(true);
 
 			client = new QueryableStateClient("localhost", proxyPortRangeStart);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
 		// the expected exception along with the adequate message
 		expectedEx.expect(FlinkRuntimeException.class);
-		expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+		expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
 
 		TestServer server1 = null;
 		TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		localTestServer.start();
 		localTestServer.shutdown();
-		assertTrue(localTestServer.isExecutorShutdown());
+		assertTrue(localTestServer.getQueryExecutor().isTerminated());
 
 		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
 				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
 			if (kvStateServer != null) {
 				try {
 					kvStateServer.start();
-					LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateServer.shutdown();
 					kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
 			if (kvStateProxy != null) {
 				try {
 					kvStateProxy.start();
-					LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
 				} catch (Throwable ie) {
 					kvStateProxy.shutdown();
 					kvStateProxy = null;