You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/06/28 21:47:48 UTC

svn commit: r1355135 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/comm/NettyClient.java

Author: jghoman
Date: Thu Jun 28 19:47:47 2012
New Revision: 1355135

URL: http://svn.apache.org/viewvc?rev=1355135&view=rev
Log:
GIRAPH-213. NettyClient.stop() could deadlock according to netty.io docs. Contributed by Eli Reisman.

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1355135&r1=1355134&r2=1355135&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jun 28 19:47:47 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-213: NettyClient.stop() could deadlock according to netty.io docs.
+  (Eli Reisman via jghoman)
+
   GIRAPH-127: Extending the API with a master.compute() function.
   (Jan van der Lugt via jghoman)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1355135&r1=1355134&r2=1355135&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Thu Jun 28 19:47:47 2012
@@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -127,25 +128,27 @@ public class NettyClient<I extends Writa
    * Stop the client.
    */
   public void stop() {
-    // Close all the connections.  Make sure the close operation ends because
-    // all I/O operations are asynchronous in Netty.
-    List<ChannelFuture> waitingCloseList =
-        new ArrayList<ChannelFuture>(addressChannelMap.size());
+    // close connections asyncronously, in a Netty-approved
+    // way, without cleaning up thread pools until all channels
+    // in addressChannelMap are closed (success or failure)
+    final int done = addressChannelMap.size();
+    final AtomicInteger count = new AtomicInteger(0);
     for (Channel channel : addressChannelMap.values()) {
-      waitingCloseList.add(channel.close());
-    }
-
-    // Wait for all the closes to succeed
-    for (ChannelFuture waitingClose : waitingCloseList) {
-      waitingClose.awaitUninterruptibly().getChannel();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("stop: Closed connection to " +
-            waitingClose.getChannel().getRemoteAddress());
-      }
+      ChannelFuture result = channel.close();
+      result.addListener(new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture cf) {
+          if (count.incrementAndGet() == done) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("stop: reached wait threshold, " +
+                done + " connections closed, releasing " +
+                "NettyClient.bootstrap resources now.");
+            }
+            bootstrap.releaseExternalResources();
+          }
+        }
+      });
     }
-
-    // Shut down all thread pools to exit.
-    bootstrap.releaseExternalResources();
   }
 
   /**
@@ -155,7 +158,7 @@ public class NettyClient<I extends Writa
    * @param request Request to send
    */
   public void sendWritableRequest(InetSocketAddress remoteServer,
-                                  WritableRequest<I, V, E, M> request) {
+    WritableRequest<I, V, E, M> request) {
     waitingRequestCount.incrementAndGet();
     Channel channel = addressChannelMap.get(remoteServer);
     if (channel == null) {