You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/03/22 05:20:07 UTC

[03/15] twill git commit: (TWILL-248) Speedup shutdown of tracker service

(TWILL-248) Speedup shutdown of tracker service

This closes #63 on Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b7785bde
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b7785bde
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b7785bde

Branch: refs/heads/site
Commit: b7785bde4e7e990072f803d89353e37f26ed8af5
Parents: aa70499
Author: Terence Yim <ch...@apache.org>
Authored: Mon Oct 30 15:10:34 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 31 10:52:16 2017 -0700

----------------------------------------------------------------------
 .../internal/appmaster/TrackerService.java      | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/b7785bde/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index bb8cf57..10de10c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -31,6 +31,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -48,6 +50,8 @@ import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.CharsetUtil;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
 import org.slf4j.Logger;
@@ -60,6 +64,8 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -82,7 +88,7 @@ public final class TrackerService extends AbstractIdleService {
 
   private String host;
   private ServerBootstrap bootstrap;
-  private Channel serverChannel;
+  private ChannelGroup channelGroup;
   private InetSocketAddress bindAddress;
   private URL url;
 
@@ -119,6 +125,7 @@ public final class TrackerService extends AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
+    channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
     EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
                                                      new ThreadFactoryBuilder()
                                                        .setDaemon(true).setNameFormat("boss-thread").build());
@@ -132,6 +139,7 @@ public final class TrackerService extends AbstractIdleService {
       .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
+          channelGroup.add(ch);
           ChannelPipeline pipeline = ch.pipeline();
           pipeline.addLast("codec", new HttpServerCodec());
           pipeline.addLast("compressor", new HttpContentCompressor());
@@ -140,7 +148,9 @@ public final class TrackerService extends AbstractIdleService {
         }
       });
 
-    serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+    Channel serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+    channelGroup.add(serverChannel);
+
     bindAddress = (InetSocketAddress) serverChannel.localAddress();
     url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
 
@@ -149,9 +159,15 @@ public final class TrackerService extends AbstractIdleService {
 
   @Override
   protected void shutDown() throws Exception {
-    serverChannel.close().await();
-    bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
-    bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+    channelGroup.close().awaitUninterruptibly();
+
+    List<Future<?>> futures = new ArrayList<>();
+    futures.add(bootstrap.config().group().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+    futures.add(bootstrap.config().childGroup().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+
+    for (Future<?> future : futures) {
+      future.awaitUninterruptibly();
+    }
 
     LOG.info("Tracker service stopped at {}", url);
   }