You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 10:06:49 UTC
[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages
about QS start.
[FLINK-8055][QS] Deduplicate logging messages about QS start.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96
Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas <kk...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerBase.java | 20 ++++++++++----------
.../flink/queryablestate/network/Client.java | 20 ++++++++++++++------
.../server/KvStateServerImpl.java | 5 -----
.../HAAbstractQueryableStateTestBase.java | 2 +-
.../network/AbstractServerTest.java | 2 +-
.../network/KvStateServerHandlerTest.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 2 --
7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
@Internal
public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
*/
public void start() throws Throwable {
Preconditions.checkState(serverAddress == null,
- "The " + serverName + " already running @ " + serverAddress + '.');
+ serverName + " is already running @ " + serverAddress + '.');
Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}
if (serverAddress != null) {
- LOG.info("Started the {} @ {}.", serverName, serverAddress);
+ log.info("Started {} @ {}.", serverName, serverAddress);
} else {
- LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
- throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
+ log.info("Unable to start {}. All ports in provided range are occupied.", serverName);
+ throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied.");
}
}
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* @throws Exception If something goes wrong during the bind operation.
*/
private boolean attemptToBind(final int port) throws Throwable {
- LOG.debug("Attempting to start server {} on port {}.", serverName, port);
+ log.debug("Attempting to start {} on port {}.", serverName, port);
this.queryExecutor = createQueryExecutor();
this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
throw future.cause();
} catch (BindException e) {
- LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage());
+ log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
shutdown();
}
// any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
* Shuts down the server and all related thread pools.
*/
public void shutdown() {
- LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+ log.info("Shutting down {} @ {}", serverName, serverAddress);
if (handler != null) {
handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
}
@VisibleForTesting
- public boolean isExecutorShutdown() {
- return queryExecutor.isShutdown();
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.network;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
while (!queuedRequests.isEmpty()) {
final PendingRequest pending = queuedRequests.poll();
- established.sendRequest(pending.request)
- .thenAccept(resp -> pending.complete(resp))
- .exceptionally(throwable -> {
- pending.completeExceptionally(throwable);
- return null;
- });
+ established.sendRequest(pending.request).whenComplete(
+ (response, throwable) -> {
+ if (throwable != null) {
+ pending.completeExceptionally(throwable);
+ } else {
+ pending.complete(response);
+ }
+ });
}
// Publish the channel for the general public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
}
}
}
+
+ @VisibleForTesting
+ public boolean isEventGroupShutdown() {
+ return bootstrap == null || bootstrap.group().isTerminated();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
@Internal
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
- private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
-
/** The {@link KvStateRegistry} to query for state instances. */
private final KvStateRegistry kvStateRegistry;
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
cluster = new TestingCluster(config, false);
- cluster.start();
+ cluster.start(true);
client = new QueryableStateClient("localhost", proxyPortRangeStart);
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
// the expected exception along with the adequate message
expectedEx.expect(FlinkRuntimeException.class);
- expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");
+ expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
TestServer server1 = null;
TestServer server2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
localTestServer.start();
localTestServer.shutdown();
- assertTrue(localTestServer.isExecutorShutdown());
+ assertTrue(localTestServer.getQueryExecutor().isTerminated());
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
if (kvStateServer != null) {
try {
kvStateServer.start();
- LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
if (kvStateProxy != null) {
try {
kvStateProxy.start();
- LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
} catch (Throwable ie) {
kvStateProxy.shutdown();
kvStateProxy = null;