You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/11/02 00:05:08 UTC

reef git commit: [REEF-1654] Implement graceful shutdown of Wake executor services

Repository: reef
Updated Branches:
  refs/heads/master 8e628faeb -> 31fbd0e99


[REEF-1654] Implement graceful shutdown of Wake executor services

This is work towards "REEF as a library" project
[REEF-1561](https://issues.apache.org/jira/browse/REEF-1561)

Summary of changes:
  * Catch the `InterruptedException` in `ThreadPoolStage` and make sure its
    `.close()` method never throws
  * Gracefully shutdown threads in `NettyMessagingTransport.close()`
  * Catch errors when closing the acceptor channel in `NettyMessagingTransport`
  * Make shure `NettyMessagingTransport.close()` never throws
  * Improve logging in `.close()` methods of `NettyMessagingTransport` and
    `ThreadPoolStage`
  * Minor refactoring for readability

JIRA:
  [REEF-1654](https://issues.apache.org/jira/browse/REEF-1654)

Pull Request:
  This closes #1174


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/31fbd0e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/31fbd0e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/31fbd0e9

Branch: refs/heads/master
Commit: 31fbd0e997534f14ec826470e838deda3ea6012f
Parents: 8e628fa
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Mon Oct 31 22:13:54 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Nov 1 17:03:36 2016 -0700

----------------------------------------------------------------------
 .../apache/reef/wake/impl/ThreadPoolStage.java  | 29 +++++++++++----
 .../netty/NettyMessagingTransport.java          | 38 ++++++++++++--------
 2 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 3b39c8a..7b6107f 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -40,13 +40,15 @@ import java.util.logging.Logger;
  * @param <T> type
  */
 public final class ThreadPoolStage<T> extends AbstractEStage<T> {
+
   private static final Logger LOG = Logger.getLogger(ThreadPoolStage.class.getName());
 
+  private static final long SHUTDOWN_TIMEOUT = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
+
   private final EventHandler<T> handler;
+  private final EventHandler<Throwable> errorHandler;
   private final ExecutorService executor;
   private final int numThreads;
-  private final long shutdownTimeout = WakeParameters.EXECUTOR_SHUTDOWN_TIMEOUT;
-  private final EventHandler<Throwable> errorHandler;
 
   /**
    * Constructs a thread-pool stage.
@@ -206,14 +208,29 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
    * Closes resources.
    */
   @Override
-  public void close() throws Exception {
+  public void close() {
+
     if (closed.compareAndSet(false, true) && numThreads > 0) {
+
+      LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: begin", this.name);
+
       executor.shutdown();
-      if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
-        LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms.");
+
+      boolean isTerminated = false;
+      try {
+        isTerminated = executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+      } catch (final InterruptedException ex) {
+        LOG.log(Level.WARNING, "Interrupted closing ThreadPoolStage " + this.name, ex);
+      }
+
+      if (!isTerminated) {
         final List<Runnable> droppedRunnables = executor.shutdownNow();
-        LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks.");
+        LOG.log(Level.SEVERE,
+            "Closing ThreadPoolStage {0}: Executor did not terminate in {1} ms. Dropping {2} tasks",
+            new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()});
       }
+
+      LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/31fbd0e9/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index c37e556..c3a910b 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -61,9 +61,15 @@ import java.util.logging.Logger;
 /**
  * Messaging transport implementation with Netty.
  */
-public class NettyMessagingTransport implements Transport {
+public final class NettyMessagingTransport implements Transport {
+
+  /**
+   * Indicates a hostname that isn't set or known.
+   */
+  public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
+
+  private static final String CLASS_NAME = NettyMessagingTransport.class.getSimpleName();
 
-  private static final String CLASS_NAME = NettyMessagingTransport.class.getName();
   private static final Logger LOG = Logger.getLogger(CLASS_NAME);
 
   private static final int SERVER_BOSS_NUM_THREADS = 3;
@@ -91,10 +97,6 @@ public class NettyMessagingTransport implements Transport {
 
   private final int numberOfTries;
   private final int retryTimeout;
-  /**
-   * Indicates a hostname that isn't set or known.
-   */
-  public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
 
   /**
    * Constructs a messaging transport.
@@ -108,7 +110,7 @@ public class NettyMessagingTransport implements Transport {
    * @param tcpPortProvider  gives an iterator that produces random tcp ports in a range
    */
   @Inject
-  NettyMessagingTransport(
+  private NettyMessagingTransport(
       @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
       @Parameter(RemoteConfiguration.Port.class) final int port,
       @Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage,
@@ -131,11 +133,11 @@ public class NettyMessagingTransport implements Transport {
     this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage);
 
     this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS,
-        new DefaultThreadFactory(CLASS_NAME + "ServerBoss"));
+        new DefaultThreadFactory(CLASS_NAME + ":ServerBoss"));
     this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS,
-        new DefaultThreadFactory(CLASS_NAME + "ServerWorker"));
+        new DefaultThreadFactory(CLASS_NAME + ":ServerWorker"));
     this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS,
-        new DefaultThreadFactory(CLASS_NAME + "ClientWorker"));
+        new DefaultThreadFactory(CLASS_NAME + ":ClientWorker"));
 
     this.clientBootstrap = new Bootstrap();
     this.clientBootstrap.group(this.clientWorkerGroup)
@@ -211,16 +213,22 @@ public class NettyMessagingTransport implements Transport {
    * Closes all channels and releases all resources.
    */
   @Override
-  public void close() throws Exception {
+  public void close() {
 
     LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
 
     this.clientChannelGroup.close().awaitUninterruptibly();
     this.serverChannelGroup.close().awaitUninterruptibly();
-    this.acceptor.close().sync();
-    this.clientWorkerGroup.shutdownGracefully();
-    this.serverBossGroup.shutdownGracefully();
-    this.serverWorkerGroup.shutdownGracefully();
+
+    try {
+      this.acceptor.close().sync();
+    } catch (final Exception ex) {
+      LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
+    }
+
+    this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly();
+    this.serverBossGroup.shutdownGracefully().awaitUninterruptibly();
+    this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly();
 
     LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
   }