You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2017/03/20 22:52:05 UTC

reef git commit: [REEF-1729] Fix test job timeouts in Travis CI

Repository: reef
Updated Branches:
  refs/heads/master bcfafbc34 -> 216ecdec0


[REEF-1729] Fix test job timeouts in Travis CI

Gracefully shutdown all worker groups and wait for them to complete in the `.close()` method

JIRA: [REEF-1729](https://issues.apache.org/jira/browse/REEF-1729)

Closes #1268


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/216ecdec
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/216ecdec
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/216ecdec

Branch: refs/heads/master
Commit: 216ecdec0f47fa5414095cfe9366db91dda90b4c
Parents: bcfafbc
Author: taegeonum <ta...@gmail.com>
Authored: Fri Mar 17 20:04:20 2017 +0900
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Mon Mar 20 15:47:58 2017 -0700

----------------------------------------------------------------------
 .../netty/NettyMessagingTransport.java          | 24 +++++++++++++++-----
 1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/216ecdec/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index c3a910b..2643030 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -25,10 +25,12 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EStage;
@@ -51,6 +53,7 @@ import java.net.BindException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -217,18 +220,27 @@ public final class NettyMessagingTransport implements Transport {
 
     LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
 
-    this.clientChannelGroup.close().awaitUninterruptibly();
-    this.serverChannelGroup.close().awaitUninterruptibly();
+    final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close();
+    final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close();
+    final ChannelFuture acceptorFuture = this.acceptor.close();
+
+    final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3);
+    eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully());
+    eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully());
+    eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully());
+
+    clientChannelGroupFuture.awaitUninterruptibly();
+    serverChannelGroupFuture.awaitUninterruptibly();
 
     try {
-      this.acceptor.close().sync();
+      acceptorFuture.sync();
     } catch (final Exception ex) {
       LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
     }
 
-    this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly();
-    this.serverBossGroup.shutdownGracefully().awaitUninterruptibly();
-    this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly();
+    for (final Future eventLoopGroupFuture : eventLoopGroupFutures) {
+      eventLoopGroupFuture.awaitUninterruptibly();
+    }
 
     LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
   }