You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2017/03/20 22:52:05 UTC
reef git commit: [REEF-1729] Fix test job timeouts in Travis CI
Repository: reef
Updated Branches:
refs/heads/master bcfafbc34 -> 216ecdec0
[REEF-1729] Fix test job timeouts in Travis CI
Gracefully shutdown all worker groups and wait for them to complete in the `.close()` method
JIRA: [REEF-1729](https://issues.apache.org/jira/browse/REEF-1729)
Closes #1268
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/216ecdec
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/216ecdec
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/216ecdec
Branch: refs/heads/master
Commit: 216ecdec0f47fa5414095cfe9366db91dda90b4c
Parents: bcfafbc
Author: taegeonum <ta...@gmail.com>
Authored: Fri Mar 17 20:04:20 2017 +0900
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Mon Mar 20 15:47:58 2017 -0700
----------------------------------------------------------------------
.../netty/NettyMessagingTransport.java | 24 +++++++++++++++-----
1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/216ecdec/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index c3a910b..2643030 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -25,10 +25,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
@@ -51,6 +53,7 @@ import java.net.BindException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -217,18 +220,27 @@ public final class NettyMessagingTransport implements Transport {
LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
- this.clientChannelGroup.close().awaitUninterruptibly();
- this.serverChannelGroup.close().awaitUninterruptibly();
+ final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close();
+ final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close();
+ final ChannelFuture acceptorFuture = this.acceptor.close();
+
+ final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3);
+ eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully());
+ eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully());
+ eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully());
+
+ clientChannelGroupFuture.awaitUninterruptibly();
+ serverChannelGroupFuture.awaitUninterruptibly();
try {
- this.acceptor.close().sync();
+ acceptorFuture.sync();
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
}
- this.clientWorkerGroup.shutdownGracefully().awaitUninterruptibly();
- this.serverBossGroup.shutdownGracefully().awaitUninterruptibly();
- this.serverWorkerGroup.shutdownGracefully().awaitUninterruptibly();
+ for (final Future eventLoopGroupFuture : eventLoopGroupFutures) {
+ eventLoopGroupFuture.awaitUninterruptibly();
+ }
LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
}