You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/02 00:05:08 UTC
reef git commit: [REEF-1654] Implement graceful shutdown of Wake
executor services
Repository: reef
Updated Branches:
refs/heads/master 8e628faeb -> 31fbd0e99
[REEF-1654] Implement graceful shutdown of Wake executor services
This is work towards "REEF as a library" project
[REEF-1561](https://issues.apache.org/jira/browse/REEF-1561)
Summary of changes:
* Catch the `InterruptedException` in `ThreadPoolStage` and make sure its
`.close()` method never throws
* Gracefully shutdown threads in `NettyMessagingTransport.close()`
* Catch errors when closing the acceptor channel in `NettyMessagingTransport`
* Make shure `NettyMessagingTransport.close()` never throws
* Improve logging in `.close()` methods of `NettyMessagingTransport` and
`ThreadPoolStage`
* Minor refactoring for readability
JIRA:
[REEF-1654](https://issues.apache.org/jira/browse/REEF-1654)
Pull Request:
This closes #1174
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/31fbd0e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/31fbd0e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/31fbd0e9
Branch: refs/heads/master
Commit: 31fbd0e997534f14ec826470e838deda3ea6012f
Parents: 8e628fa
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Mon Oct 31 22:13:54 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Nov 1 17:03:36 2016 -0700
----------------------------------------------------------------------
.../apache/reef/wake/impl/ThreadPoolStage.java | 29 +++++++++++----
.../netty/NettyMessagingTransport.java | 38 ++++++++++++--------
2 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 3b39c8a..7b6107f 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -40,13 +40,15 @@ import java.util.logging.Logger;
* @param <T> type
*/
public final class ThreadPoolStage<T> extends AbstractEStage<T> {
+
private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName());
+ private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
+
private final EventHandler<T> handler;
+ private final EventHandler<Throwable> errorHandler;
private final ExecutorService executor;
private final int numThreads;
- private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
- private final EventHandler<Throwable> errorHandler;
/**
* Constructs a thread-pool stage.
@@ -206,14 +208,29 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
* Closes resources.
*/
@Override
- public void close() throws Exception {
+ public void close() {
+
if (closed.compareAndSet(false, true) && numThreads > 0) {
+
+ LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name);
+
executor.shutdown();
- if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
- LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
+
+ boolean isTerminated = false;
+ try {
+ isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex);
+ }
+
+ if (!isTerminated) {
final List<Runnable> droppedRunnables = executor.shutdownNow();
- LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
+ LOG.log(Level.SEVERE,
+ "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks",
+ new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
}
+
+ LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index c37e556..c3a910b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -61,9 +61,15 @@ import java.util.logging.Logger;
/**
* Messaging transport implementation with Netty.
*/
-public class NettyMessagingTransport implements Transport {
+public final class NettyMessagingTransport implements Transport {
+
+ /**
+ * Indicates a hostname that isn't set or known.
+ */
+ public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
+
+ private static final String CLASS_NAME = NettyMessagingTransport.class.getSimpleName();
- private static final String CLASS_NAME = NettyMessagingTransport.class.getName();
private static final Logger LOG = Logger.getLogger(CLASS_NAME);
private static final int SERVER_BOSS_NUM_THREADS = 3;
@@ -91,10 +97,6 @@ public class NettyMessagingTransport implements Transport {
private final int numberOfTries;
private final int retryTimeout;
- /**
- * Indicates a hostname that isn't set or known.
- */
- public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
/**
* Constructs a messaging transport.
@@ -108,7 +110,7 @@ public class NettyMessagingTransport implements Transport {
* @param tcpPortProvider gives an iterator that produces random tcp ports in a range
*/
@Inject
- NettyMessagingTransport(
+ private NettyMessagingTransport(
@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
@Parameter(RemoteConfiguration.Port.class) final int port,
@Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage,
@@ -131,11 +133,11 @@ public class NettyMessagingTransport implements Transport {
this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage);
this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS,
- new DefaultThreadFactory(CLASS_NAME + "ServerBoss"));
+ new DefaultThreadFactory(CLASS_NAME + ":ServerBoss"));
this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS,
- new DefaultThreadFactory(CLASS_NAME + "ServerWorker"));
+ new DefaultThreadFactory(CLASS_NAME + ":ServerWorker"));
this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS,
- new DefaultThreadFactory(CLASS_NAME + "ClientWorker"));
+ new DefaultThreadFactory(CLASS_NAME + ":ClientWorker"));
this.clientBootstrap = new Bootstrap();
this.clientBootstrap.group(this.clientWorkerGroup)
@@ -211,16 +213,22 @@ public class NettyMessagingTransport implements Transport {
* Closes all channels and releases all resources.
*/
@Override
- public void close() throws Exception {
+ public void close() {
LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
this.clientChannelGroup.close().awaitUninterruptibly();
this.serverChannelGroup.close().awaitUninterruptibly();
- this.acceptor.close().sync();
- this.clientWorkerGroup.shutdownGracefully();
- this.serverBossGroup.shutdownGracefully();
- this.serverWorkerGroup.shutdownGracefully();
+
+ try {
+ this.acceptor.close().sync();
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
+ }
+
+ this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly();
+ this.serverBossGroup.shutdownGracefully().awaitUninterruptibly();
+ this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly();
LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
}