You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/06/28 21:47:48 UTC
svn commit: r1355135 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/comm/NettyClient.java
Author: jghoman
Date: Thu Jun 28 19:47:47 2012
New Revision: 1355135
URL: http://svn.apache.org/viewvc?rev=1355135&view=rev
Log:
GIRAPH-213. NettyClient.stop() could deadlock according to netty.io docs. Contributed by Eli Reisman.
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1355135&r1=1355134&r2=1355135&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jun 28 19:47:47 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-213: NettyClient.stop() could deadlock according to netty.io docs.
+ (Eli Reisman via jghoman)
+
GIRAPH-127: Extending the API with a master.compute() function.
(Jan van der Lugt via jghoman)
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1355135&r1=1355134&r2=1355135&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Thu Jun 28 19:47:47 2012
@@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
@@ -127,25 +128,27 @@ public class NettyClient<I extends Writa
* Stop the client.
*/
public void stop() {
- // Close all the connections. Make sure the close operation ends because
- // all I/O operations are asynchronous in Netty.
- List<ChannelFuture> waitingCloseList =
- new ArrayList<ChannelFuture>(addressChannelMap.size());
+ // close connections asyncronously, in a Netty-approved
+ // way, without cleaning up thread pools until all channels
+ // in addressChannelMap are closed (success or failure)
+ final int done = addressChannelMap.size();
+ final AtomicInteger count = new AtomicInteger(0);
for (Channel channel : addressChannelMap.values()) {
- waitingCloseList.add(channel.close());
- }
-
- // Wait for all the closes to succeed
- for (ChannelFuture waitingClose : waitingCloseList) {
- waitingClose.awaitUninterruptibly().getChannel();
- if (LOG.isInfoEnabled()) {
- LOG.info("stop: Closed connection to " +
- waitingClose.getChannel().getRemoteAddress());
- }
+ ChannelFuture result = channel.close();
+ result.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture cf) {
+ if (count.incrementAndGet() == done) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: reached wait threshold, " +
+ done + " connections closed, releasing " +
+ "NettyClient.bootstrap resources now.");
+ }
+ bootstrap.releaseExternalResources();
+ }
+ }
+ });
}
-
- // Shut down all thread pools to exit.
- bootstrap.releaseExternalResources();
}
/**
@@ -155,7 +158,7 @@ public class NettyClient<I extends Writa
* @param request Request to send
*/
public void sendWritableRequest(InetSocketAddress remoteServer,
- WritableRequest<I, V, E, M> request) {
+ WritableRequest<I, V, E, M> request) {
waitingRequestCount.incrementAndGet();
Channel channel = addressChannelMap.get(remoteServer);
if (channel == null) {