You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/15 22:07:32 UTC

svn commit: r1373609 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/graph/partition/ src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/comm/

Author: aching
Date: Wed Aug 15 20:07:32 2012
New Revision: 1373609

URL: http://svn.apache.org/viewvc?rev=1373609&view=rev
Log:
GIRAPH-300) Improve netty reliability with retrying failed
connections, tracking requests, thread-safe hash partitioning (aching
via apresta).

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 15 20:07:32 2012
@@ -2,8 +2,12 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
-  GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint.
-  (majakabiljo via apresta)
+  GIRAPH-300) Improve netty reliability with retrying failed
+  connections, tracking requests, thread-safe hash partitioning
+  (aching via apresta).
+
+  GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in
+  checkpoint.  (majakabiljo via apresta)
 
   GIRAPH-297: Checkpointing on master is done one superstep later
   (majakabiljo via aching).
@@ -18,7 +22,8 @@ Release 0.2.0 - unreleased
   GIRAPH-218: Consolidate all I/O Format classes under one roof in
   lib/ directory.  (Eli Reisman via jghoman)
 
-  GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
+  GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via
+  apresta)
 
   GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP
   by # of vertices results in wide variance in RPC message sizes. (Eli

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Aug 15 20:07:32 2012
@@ -914,7 +914,7 @@ under the License.
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
-      <version>3.3.1.Final</version>
+      <version>3.5.3.Final</version>
     </dependency>
  </dependencies>
 </project>

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Wed Aug 15 20:07:32 2012
@@ -18,16 +18,19 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -42,6 +45,7 @@ import org.jboss.netty.channel.ChannelPi
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
 
 /**
  * Netty client for sending requests.
@@ -67,21 +71,26 @@ public class NettyClient<I extends Writa
       "giraph.maxNumberOfOpenRequests";
   /** Default maximum number of requests without confirmation */
   public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
-
+  /** Maximum number of requests to list (for debugging) */
+  public static final int MAX_REQUESTS_TO_LIST = 10;
+  /** 30 seconds to connect by default */
+  public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
   /** Context used to report progress */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Client bootstrap */
   private final ClientBootstrap bootstrap;
-  /** Atomic count of outstanding requests (synchronize on self) */
-  private final AtomicInteger waitingRequestCount = new AtomicInteger(0);
   /**
    * Map of the peer connections, mapping from remote socket address to client
    * meta data
    */
   private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
       Maps.newHashMap();
+  /** Atomic request id, used in outstandingRequestMap */
+  private final AtomicInteger requestId = new AtomicInteger(0);
+  /** Outstanding request map (tracks all requests). */
+  private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
   /** Number of channels per server */
   private final int channelsPerServer;
   /** Byte counter for this client */
@@ -95,6 +104,10 @@ public class NettyClient<I extends Writa
   private final boolean limitNumberOfOpenRequests;
   /** Maximum number of requests without confirmation we can have */
   private final int maxNumberOfOpenRequests;
+  /** Maximum number of connnection failures */
+  private final int maxConnectionFailures;
+  /** Timed logger for printing request debugging */
+  private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
 
   /**
    * Only constructor
@@ -127,13 +140,27 @@ public class NettyClient<I extends Writa
       maxNumberOfOpenRequests = -1;
     }
 
+    maxConnectionFailures = conf.getInt(
+        GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
+        GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+
+    int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+        NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+    outstandingRequestMap =
+        new MapMaker().concurrencyLevel(maxThreads).makeMap();
+
     // Configure the client.
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
             Executors.newCachedThreadPool(),
             Executors.newCachedThreadPool(),
-            conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
-                NettyServer.DEFAULT_MAXIMUM_THREAD_POOL_SIZE)));
+            maxThreads));
+    bootstrap.setOption("connectTimeoutMillis",
+        MAX_CONNECTION_MILLISECONDS_DEFAULT);
+    bootstrap.setOption("tcpNoDelay", true);
+    bootstrap.setOption("keepAlive", true);
+    bootstrap.setOption("sendBufferSize", sendBufferSize);
+    bootstrap.setOption("receiveBufferSize", receiveBufferSize);
 
     // Set up the pipeline factory.
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -141,64 +168,121 @@ public class NettyClient<I extends Writa
       public ChannelPipeline getPipeline() throws Exception {
         return Channels.pipeline(
             byteCounter,
+            new FixedLengthFrameDecoder(9),
             new RequestEncoder(),
-            new ResponseClientHandler(waitingRequestCount));
+            new ResponseClientHandler(outstandingRequestMap));
       }
     });
   }
 
   /**
+   * Pair object for connectAllAddresses().
+   */
+  private static class ChannelFutureAddress {
+    /** Future object */
+    private final ChannelFuture future;
+    /** Address of the future */
+    private final InetSocketAddress address;
+
+    /**
+     * Constructor.
+     *
+     * @param future Immutable future
+     * @param address Immutable address
+     */
+    ChannelFutureAddress(ChannelFuture future, InetSocketAddress address) {
+      this.future = future;
+      this.address = address;
+    }
+  }
+
+  /**
    * Connect to a collection of addresses
    *
    * @param addresses Addresses to connect to (if haven't already connected)
    */
-  public void connectAllAddresses(Collection<InetSocketAddress> addresses) {
-    List<ChannelFuture> waitingConnectionList =
-        new ArrayList<ChannelFuture>();
+  public void connectAllAddresses(Set<InetSocketAddress> addresses) {
+    List<ChannelFutureAddress> waitingConnectionList =
+        Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
     for (InetSocketAddress address : addresses) {
-      if (address == null) {
+      context.progress();
+      if (address == null || address.getHostName() == null ||
+          address.getHostName().isEmpty()) {
         throw new IllegalStateException("connectAllAddresses: Null address " +
             "in addresses " + addresses);
       }
+      if (address.isUnresolved()) {
+        throw new IllegalStateException("connectAllAddresses: Unresolved " +
+            "address " + address);
+      }
 
       if (addressChannelMap.containsKey(address)) {
         continue;
       }
 
       // Start connecting to the remote server up to n time
-      ChannelRotater channelRotater = new ChannelRotater();
       for (int i = 0; i < channelsPerServer; ++i) {
         ChannelFuture connectionFuture = bootstrap.connect(address);
-        connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
-        connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
-        connectionFuture.getChannel().getConfig().setOption(
-            "sendBufferSize", sendBufferSize);
-        connectionFuture.getChannel().getConfig().setOption(
-            "receiveBufferSize", receiveBufferSize);
-        channelRotater.addChannel(connectionFuture.getChannel());
-        waitingConnectionList.add(connectionFuture);
+
+        waitingConnectionList.add(
+            new ChannelFutureAddress(connectionFuture, address));
       }
-      addressChannelMap.put(address, channelRotater);
     }
 
-    // Wait for all the connections to succeed
-    for (ChannelFuture waitingConnection : waitingConnectionList) {
-      ChannelFuture future =
-          waitingConnection.awaitUninterruptibly();
-      if (!future.isSuccess()) {
-        throw new IllegalStateException("connectAllAddresses: Future failed " +
-            "with " + future.getCause());
-      }
-      Channel channel = future.getChannel();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("connectAllAddresses: Connected to " +
-            channel.getRemoteAddress());
-      }
+    // Wait for all the connections to succeed up to n tries
+    int failures = 0;
+    int connected = 0;
+    while (failures < maxConnectionFailures) {
+      List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
+      for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
+        context.progress();
+        ChannelFuture future =
+            waitingConnection.future.awaitUninterruptibly();
+        if (!future.isSuccess()) {
+          LOG.warn("connectAllAddresses: Future failed " +
+              "to connect with " + waitingConnection.address + " with " +
+              failures + " failures and because of " + future.getCause());
+
+          ChannelFuture connectionFuture =
+              bootstrap.connect(waitingConnection.address);
+          nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
+              waitingConnection.address));
+          ++failures;
+        } else {
+          Channel channel = future.getChannel();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("connectAllAddresses: Connected to " +
+                channel.getRemoteAddress());
+          }
+
+          if (channel.getRemoteAddress() == null) {
+            throw new IllegalStateException(
+                "connectAllAddresses: Null remote address!");
+          }
 
-      if (channel.getRemoteAddress() == null) {
-        throw new IllegalStateException("connectAllAddresses: Null remote " +
-            "address!");
+          ChannelRotater rotater =
+              addressChannelMap.get(waitingConnection.address);
+          if (rotater == null) {
+            rotater = new ChannelRotater();
+            addressChannelMap.put(waitingConnection.address, rotater);
+          }
+          rotater.addChannel(future.getChannel());
+          ++connected;
+        }
+      }
+      LOG.info("connectAllAddresses: Successfully added " +
+          (waitingConnectionList.size() - nextCheckFutures.size()) +
+          " connections, (" + connected + " total connected) " +
+          nextCheckFutures.size() + " failed, " +
+          failures + " failures total.");
+      if (nextCheckFutures.isEmpty()) {
+        break;
       }
+      waitingConnectionList = nextCheckFutures;
+    }
+    if (failures >= maxConnectionFailures) {
+      throw new IllegalStateException(
+          "connectAllAddresses: Too many failures (" + failures + ").");
     }
   }
 
@@ -206,7 +290,7 @@ public class NettyClient<I extends Writa
    * Stop the client.
    */
   public void stop() {
-    // close connections asyncronously, in a Netty-approved
+    // Close connections asyncronously, in a Netty-approved
     // way, without cleaning up thread pools until all channels
     // in addressChannelMap are closed (success or failure)
     int channelCount = 0;
@@ -243,18 +327,28 @@ public class NettyClient<I extends Writa
    */
   public void sendWritableRequest(InetSocketAddress remoteServer,
                                   WritableRequest<I, V, E, M> request) {
-    if (waitingRequestCount.get() == 0) {
+    if (outstandingRequestMap.isEmpty()) {
       byteCounter.resetAll();
     }
-    waitingRequestCount.incrementAndGet();
+    request.setRequestId(requestId.incrementAndGet());
     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
     if (channel == null) {
       throw new IllegalStateException(
           "sendWritableRequest: No channel exists for " + remoteServer);
     }
-    channel.write(request);
+    RequestInfo newRequestInfo = new RequestInfo(remoteServer);
+    RequestInfo oldRequestInfo = outstandingRequestMap.putIfAbsent(
+        request.getRequestId(), newRequestInfo);
+    if (oldRequestInfo != null) {
+      throw new IllegalStateException("sendWritableRequest: Impossible to " +
+          "have a previous request id = " + request.getRequestId() + ", " +
+          "request info of " + oldRequestInfo);
+    }
+    ChannelFuture writeFuture = channel.write(request);
+    newRequestInfo.setWriteFuture(writeFuture);
+
     if (limitNumberOfOpenRequests &&
-        waitingRequestCount.get() > maxNumberOfOpenRequests) {
+        outstandingRequestMap.size() > maxNumberOfOpenRequests) {
       waitSomeRequests(maxNumberOfOpenRequests);
     }
   }
@@ -279,16 +373,25 @@ public class NettyClient<I extends Writa
    *                        complete
    */
   private void waitSomeRequests(int maxOpenRequests) {
-    synchronized (waitingRequestCount) {
-      while (waitingRequestCount.get() > maxOpenRequests) {
-        if (LOG.isInfoEnabled()) {
+    synchronized (outstandingRequestMap) {
+      while (outstandingRequestMap.size() > maxOpenRequests) {
+        if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
           LOG.info("waitSomeRequests: Waiting interval of " +
-              WAITING_REQUEST_MSECS + " msecs, " + waitingRequestCount +
+              WAITING_REQUEST_MSECS + " msecs, " +
+              outstandingRequestMap.size() +
               " open requests, waiting for it to be <= " + maxOpenRequests +
               ", " + byteCounter.getMetrics());
+
+          if (outstandingRequestMap.size() < MAX_REQUESTS_TO_LIST) {
+            for (Map.Entry<Long, RequestInfo> entry :
+                outstandingRequestMap.entrySet()) {
+              LOG.info("waitSomeRequests: Waiting for request " +
+                  entry.getKey() + " - " + entry.getValue());
+            }
+          }
         }
         try {
-          waitingRequestCount.wait(WAITING_REQUEST_MSECS);
+          outstandingRequestMap.wait(WAITING_REQUEST_MSECS);
         } catch (InterruptedException e) {
           LOG.error("waitFutures: Got unexpected InterruptedException", e);
         }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Wed Aug 15 20:07:32 2012
@@ -59,7 +59,9 @@ public class NettyServer<I extends Writa
      V extends Writable, E extends Writable,
      M extends Writable> {
   /** Default maximum thread pool size */
-  public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 32;
+  public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
+  /** Default TCP backlog */
+  public static final int TCP_BACKLOG_DEFAULT = 100;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
@@ -125,8 +127,7 @@ public class NettyServer<I extends Writa
       throw new IllegalStateException("NettyServer: unable to get hostname");
     }
     maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
-                                  DEFAULT_MAXIMUM_THREAD_POOL_SIZE);
-    Executors.newCachedThreadPool(workerFactory);
+                                  MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
 
     channelFactory = new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(bossFactory),
@@ -140,6 +141,11 @@ public class NettyServer<I extends Writa
   public void start() {
     bootstrap = new ServerBootstrap(channelFactory);
     // Set up the pipeline factory.
+    bootstrap.setOption("child.keepAlive", true);
+    bootstrap.setOption("child.tcpNoDelay", true);
+    bootstrap.setOption("child.sendBufferSize", sendBufferSize);
+    bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
+    bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT);
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
@@ -171,8 +177,6 @@ public class NettyServer<I extends Writa
     // preserving debugability from the port number alone.
     // Round up the max number of workers to the next power of 10 and use
     // it as a constant to increase the port number with.
-    boolean tcpNoDelay = false;
-    boolean keepAlive = false;
     while (bindAttempts < maxRpcPortBindAttempts) {
       this.myAddress = new InetSocketAddress(localHostname, bindPort);
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
@@ -189,10 +193,6 @@ public class NettyServer<I extends Writa
       try {
         Channel ch = bootstrap.bind(myAddress);
         accepted.add(ch);
-        tcpNoDelay = ch.getConfig().setOption("tcpNoDelay", true);
-        keepAlive = ch.getConfig().setOption("keepAlive", true);
-        ch.getConfig().setOption("sendBufferSize", sendBufferSize);
-        ch.getConfig().setOption("receiveBufferSize", receiveBufferSize);
 
         break;
       } catch (ChannelException e) {
@@ -212,9 +212,9 @@ public class NettyServer<I extends Writa
       LOG.info("start: Started server " +
           "communication server: " + myAddress + " with up to " +
           maximumPoolSize + " threads on bind attempt " + bindAttempts +
-          " with tcpNoDelay = " + tcpNoDelay + " and keepAlive = " +
-          keepAlive + " sendBufferSize = " + sendBufferSize +
-          " receiveBufferSize = " + receiveBufferSize);
+          " with sendBufferSize = " + sendBufferSize +
+          " receiveBufferSize = " + receiveBufferSize + " backlog = " +
+          bootstrap.getOption("backlog"));
     }
   }
 
@@ -222,6 +222,9 @@ public class NettyServer<I extends Writa
    * Stop the server.
    */
   public void stop() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Halting netty server");
+    }
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
     bootstrap.releaseExternalResources();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Sets;
+import java.util.Set;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
@@ -34,13 +36,11 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -115,8 +115,8 @@ public class NettyWorkerClient<I extends
   public void fixPartitionIdToSocketAddrMap() {
     // 1. Fix all the cached inet addresses (remove all changed entries)
     // 2. Connect to any new RPC servers
-    List<InetSocketAddress> addresses =
-        Lists.newArrayListWithCapacity(service.getPartitionOwners().size());
+    Set<InetSocketAddress> addresses =
+        Sets.newHashSetWithExpectedSize(service.getPartitionOwners().size());
     for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
       InetSocketAddress address =
           partitionIndexAddressMap.get(

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java?rev=1373609&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java Wed Aug 15 20:07:32 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+import org.jboss.netty.channel.ChannelFuture;
+
+/**
+ * Help track requests throughout the system
+ */
+public class RequestInfo {
+  /** Destination of the request */
+  private final InetSocketAddress destinationAddress;
+  /** When the request was started */
+  private final long startedMsecs;
+  /** Future of the write of this request*/
+  private volatile ChannelFuture writeFuture;
+
+  /**
+   * Constructor.
+   *
+   * @param destinationAddress Destination of the request
+   */
+  public RequestInfo(InetSocketAddress destinationAddress) {
+    this.destinationAddress = destinationAddress;
+    this.startedMsecs = System.currentTimeMillis();
+  }
+
+  public InetSocketAddress getAddress() {
+    return destinationAddress;
+  }
+
+  public long getStartedMsecs() {
+    return startedMsecs;
+  }
+
+  /**
+   * Get the elapsed time since the request started.
+   *
+   * @return Msecs since the request was started
+   */
+  public long getElapsedMsecs() {
+    return System.currentTimeMillis() - startedMsecs;
+  }
+
+  public void setWriteFuture(ChannelFuture writeFuture) {
+    this.writeFuture = writeFuture;
+  }
+
+  public ChannelFuture getWriteFuture() {
+    return writeFuture;
+  }
+
+  @Override
+  public String toString() {
+    return "(destAddr=" + destinationAddress +
+        ",startDate=" + new Date(startedMsecs) + ",elapsedMsecs=" +
+        getElapsedMsecs() +
+        ((writeFuture == null) ? ")" :
+            ",writeDone=" + writeFuture.isDone() +
+                ",writeSuccess=" + writeFuture.isSuccess() + ")");
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java Wed Aug 15 20:07:32 2012
@@ -24,6 +24,8 @@ import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
@@ -65,9 +67,34 @@ public class RequestServerHandler<I exte
         (WritableRequest<I, V, E, M>) e.getMessage();
     writableRequest.doRequest(serverData);
 
-    // Send the success response
-    ChannelBuffer buffer = ChannelBuffers.directBuffer(1);
+    // Send the success response with the request id
+    ChannelBuffer buffer = ChannelBuffers.directBuffer(9);
+    buffer.writeLong(writableRequest.getRequestId());
     buffer.writeByte(0);
     e.getChannel().write(buffer);
   }
+
+  @Override
+  public void channelConnected(ChannelHandlerContext ctx,
+                               ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelConnected: Connected the channel on " +
+          ctx.getChannel().getRemoteAddress());
+    }
+  }
+
+  @Override
+  public void channelClosed(ChannelHandlerContext ctx,
+                            ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelClosed: Closed the channel on " +
+          ctx.getChannel().getRemoteAddress());
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    throw new IllegalStateException("exceptionCaught: Channel failed with " +
+        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java Wed Aug 15 20:07:32 2012
@@ -19,12 +19,13 @@
 package org.apache.giraph.comm;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -36,16 +37,17 @@ public class ResponseClientHandler exten
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ResponseClientHandler.class);
-  /** Keep track on the responses received */
-  private final AtomicInteger waitingRequestCount;
+  /** Outstanding request map */
+  private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
 
   /**
    * Constructor.
    *
-   * @param waitingRequestCount Number of requests to wait for
+   * @param outstandingRequestMap Map of outstanding requests
    */
-  public ResponseClientHandler(AtomicInteger waitingRequestCount) {
-    this.waitingRequestCount = waitingRequestCount;
+  public ResponseClientHandler(
+      ConcurrentMap<Long, RequestInfo> outstandingRequestMap) {
+    this.outstandingRequestMap = outstandingRequestMap;
   }
 
   @Override
@@ -57,48 +59,52 @@ public class ResponseClientHandler exten
     }
     ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
     ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+    long requestId = -1;
     int response = -1;
     try {
-      for (int i = 0; i < buffer.capacity(); ++i) {
-        try {
-          response = inputStream.readByte();
-        } catch (IOException e) {
-          throw new IllegalStateException(
-              "messageReceived: Got IOException ", e);
-        }
-        if (response != 0) {
-          throw new IllegalStateException(
-              "messageReceived: Got illegal response " + response);
-        }
-      }
-    } finally {
-      try {
-        inputStream.close();
-      } catch (IOException e) {
-        throw new IllegalStateException("messageReceived: Got IOException ", e);
-      }
+      requestId = inputStream.readLong();
+      response = inputStream.readByte();
+      inputStream.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "messageReceived: Got IOException ", e);
+    }
+    if (response != 0) {
+      throw new IllegalStateException(
+          "messageReceived: Got illegal response " + response);
     }
 
-    synchronized (waitingRequestCount) {
-      int currentRequestCount =
-          waitingRequestCount.addAndGet(-1 * buffer.capacity());
-      if (currentRequestCount < 0) {
-        throw new IllegalStateException("messageReceived: Impossible to " +
-            "have negative currentRequestCount " + currentRequestCount);
-      } else if (currentRequestCount == 0) {
-        waitingRequestCount.notify();
-      }
+    RequestInfo requestInfo = outstandingRequestMap.remove(requestId);
+    if (requestInfo == null) {
+      throw new IllegalStateException("messageReceived: Impossible to " +
+          "have a non-registered requestId " + requestId);
+    } else {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("messageReceived: currentRequestCount = " +
-            currentRequestCount + ", bytes = " + buffer.capacity());
+        LOG.debug("messageReceived: Processed request id = " + requestId +
+            " " + requestInfo + ".  Waiting on " +
+            outstandingRequestMap.size() +
+            " requests, bytes = " + buffer.capacity());
       }
     }
+
+    // Help NettyClient#waitSomeRequests() to finish faster
+    synchronized (outstandingRequestMap) {
+      outstandingRequestMap.notifyAll();
+    }
+  }
+
+  @Override
+  public void channelClosed(ChannelHandlerContext ctx,
+                            ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelClosed: Closed the channel on " +
+          ctx.getChannel().getRemoteAddress());
+    }
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
     throw new IllegalStateException("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.getChannel().getRemoteAddress() + " with " +
-        "cause " + e.getCause());
+        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java Wed Aug 15 20:07:32 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.comm.RequestRegistry.Type;
 import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -47,7 +46,7 @@ import java.util.Map.Entry;
 @SuppressWarnings("rawtypes")
 public class SendPartitionMessagesRequest<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable> implements WritableRequest<I, V, E, M> {
+    M extends Writable> extends WritableRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionMessagesRequest.class);
@@ -55,8 +54,6 @@ public class SendPartitionMessagesReques
   private int partitionId;
   /** Messages sent for a partition */
   private Map<I, Collection<M>> vertexIdMessages;
-  /** Configuration */
-  private Configuration conf;
 
   /**
    * Constructor used for reflection only
@@ -76,17 +73,17 @@ public class SendPartitionMessagesReques
   }
 
   @Override
-  public void readFields(DataInput input) throws IOException {
+  public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
     int vertexIdMessagesSize = input.readInt();
     vertexIdMessages = Maps.newHashMapWithExpectedSize(vertexIdMessagesSize);
     for (int i = 0; i < vertexIdMessagesSize; ++i) {
-      I vertexId = BspUtils.<I>createVertexId(conf);
+      I vertexId = BspUtils.<I>createVertexId(getConf());
       vertexId.readFields(input);
       int messageCount = input.readInt();
       List<M> messageList = Lists.newArrayListWithCapacity(messageCount);
       for (int j = 0; j < messageCount; ++j) {
-        M message = BspUtils.<M>createMessageValue(conf);
+        M message = BspUtils.<M>createMessageValue(getConf());
         message.readFields(input);
         messageList.add(message);
       }
@@ -98,7 +95,7 @@ public class SendPartitionMessagesReques
   }
 
   @Override
-  public void write(DataOutput output) throws IOException {
+  public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionId);
     output.writeInt(vertexIdMessages.size());
     for (Entry<I, Collection<M>> entry : vertexIdMessages.entrySet()) {
@@ -125,16 +122,6 @@ public class SendPartitionMessagesReques
     }
   }
 
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
   /**
    * Get id of partition
    *

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java Wed Aug 15 20:07:32 2012
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.giraph.comm.RequestRegistry.Type;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.VertexMutations;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -46,7 +45,7 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("rawtypes")
 public class SendPartitionMutationsRequest<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable> implements WritableRequest<I, V, E, M> {
+    M extends Writable> extends WritableRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionMutationsRequest.class);
@@ -54,8 +53,6 @@ public class SendPartitionMutationsReque
   private int partitionId;
   /** Mutations sent for a partition */
   private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
-  /** Configuration */
-  private Configuration conf;
 
   /**
    * Constructor used for reflection only
@@ -68,23 +65,24 @@ public class SendPartitionMutationsReque
    * @param partitionId Partition to send the request to
    * @param vertexIdMutations Map of mutations to send
    */
-  public SendPartitionMutationsRequest(int partitionId,
+  public SendPartitionMutationsRequest(
+      int partitionId,
       Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
     this.partitionId = partitionId;
     this.vertexIdMutations = vertexIdMutations;
   }
 
   @Override
-  public void readFields(DataInput input) throws IOException {
+  public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
     int vertexIdMutationsSize = input.readInt();
     vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
     for (int i = 0; i < vertexIdMutationsSize; ++i) {
-      I vertexId = BspUtils.<I>createVertexId(conf);
+      I vertexId = BspUtils.<I>createVertexId(getConf());
       vertexId.readFields(input);
       VertexMutations<I, V, E, M> vertexMutations =
           new VertexMutations<I, V, E, M>();
-      vertexMutations.setConf(conf);
+      vertexMutations.setConf(getConf());
       vertexMutations.readFields(input);
       if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
         throw new IllegalStateException(
@@ -94,7 +92,7 @@ public class SendPartitionMutationsReque
   }
 
   @Override
-  public void write(DataOutput output) throws IOException {
+  public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionId);
     output.writeInt(vertexIdMutations.size());
     for (Entry<I, VertexMutations<I, V, E, M>> entry :
@@ -129,14 +127,4 @@ public class SendPartitionMutationsReque
       }
     }
   }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Wed Aug 15 20:07:32 2012
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.giraph.comm.RequestRegistry.Type;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -45,7 +44,7 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("rawtypes")
 public class SendVertexRequest<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable> implements WritableRequest<I, V, E, M> {
+    M extends Writable> extends WritableRequest<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendVertexRequest.class);
@@ -53,8 +52,6 @@ public class SendVertexRequest<I extends
   private int partitionId;
   /** List of vertices to be stored on this partition */
   private Collection<Vertex<I, V, E, M>> vertices;
-  /** Configuration */
-  private Configuration conf;
 
   /**
    * Constructor used for reflection only
@@ -67,26 +64,26 @@ public class SendVertexRequest<I extends
    * @param partitionId Partition to send the request to
    * @param vertices Vertices to send
    */
-  public SendVertexRequest(
-      int partitionId, Collection<Vertex<I, V, E, M>> vertices) {
+  public SendVertexRequest(int partitionId,
+                           Collection<Vertex<I, V, E, M>> vertices) {
     this.partitionId = partitionId;
     this.vertices = vertices;
   }
 
   @Override
-  public void readFields(DataInput input) throws IOException {
+  public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
     int verticesCount = input.readInt();
     vertices = Lists.newArrayListWithCapacity(verticesCount);
     for (int i = 0; i < verticesCount; ++i) {
-      Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+      Vertex<I, V, E, M> vertex = BspUtils.createVertex(getConf());
       vertex.readFields(input);
       vertices.add(vertex);
     }
   }
 
   @Override
-  public void write(DataOutput output) throws IOException {
+  public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionId);
     output.writeInt(vertices.size());
     for (Vertex<I, V, E, M> vertex : vertices) {
@@ -121,15 +118,5 @@ public class SendVertexRequest<I extends
       vertexMap.addAll(vertices);
     }
   }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
 }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Wed Aug 15 20:07:32 2012
@@ -18,8 +18,12 @@
 
 package org.apache.giraph.comm;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import org.apache.giraph.comm.RequestRegistry.Type;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,19 +36,69 @@ import org.apache.hadoop.io.WritableComp
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface WritableRequest<I extends WritableComparable,
+public abstract class WritableRequest<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable> extends Writable, Configurable {
+    M extends Writable> implements Writable, Configurable {
+  /** Configuration */
+  private Configuration conf;
+  /** Request id */
+  private long requestId = -1;
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(long requestId) {
+    this.requestId = requestId;
+  }
+
   /**
    * Get the type of the request
    *
    * @return Request type
    */
-  Type getType();
+  public abstract Type getType();
+
+  /**
+   * Serialize the request
+   *
+   * @param input Input to read fields from
+   */
+  abstract void readFieldsRequest(DataInput input) throws IOException;
+
+  /**
+   * Deserialize the request
+   *
+   * @param output Output to write the request to
+   */
+  abstract void writeRequest(DataOutput output) throws IOException;
+
   /**
    * Execute the request
    *
    * @param serverData Accessible data that can be mutated per the request
    */
-  void doRequest(ServerData<I, V, E, M> serverData);
+  public abstract void doRequest(ServerData<I, V, E, M> serverData);
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public final void readFields(DataInput input) throws IOException {
+    requestId = input.readLong();
+    readFieldsRequest(input);
+  }
+
+  @Override
+  public final void write(DataOutput output) throws IOException {
+    output.writeLong(requestId);
+    writeRequest(output);
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import com.google.common.collect.Sets;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -89,6 +90,8 @@ public class BspServiceMaster<I extends 
     implements CentralizedServiceMaster<I, V, E, M> {
   /** Counter group name for the Giraph statistics */
   public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+  /** Print worker names only if there are 10 workers left */
+  public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
   /** Superstep counter */
@@ -344,8 +347,7 @@ public class BspServiceMaster<I extends 
     } catch (KeeperException e) {
       throw new IllegalStateException("getWorkers: KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException("getWorkers: IllegalStateException"
-          , e);
+      throw new IllegalStateException("getWorkers: IllegalStateException", e);
     }
 
     try {
@@ -357,8 +359,7 @@ public class BspServiceMaster<I extends 
     } catch (KeeperException e) {
       throw new IllegalStateException("getWorkers: KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException("getWorkers: IllegalStateException"
-          , e);
+      throw new IllegalStateException("getWorkers: IllegalStateException", e);
     }
 
     List<WorkerInfo> currentHealthyWorkerInfoList =
@@ -1339,6 +1340,12 @@ public class BspServiceMaster<I extends 
             " out of " + workerInfoList.size() +
             " workers finished on superstep " +
             getSuperstep() + " on path " + finishedWorkerPath);
+        if (workerInfoList.size() - finishedHostnameIdList.size() <
+            MAX_PRINTABLE_REMAINING_WORKERS) {
+          Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
+          remainingWorkers.removeAll(finishedHostnameIdList);
+          LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
+        }
       }
       getContext().setStatus(getGraphMapper().getMapFunctions() + " - " +
           finishedHostnameIdList.size() +

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed Aug 15 20:07:32 2012
@@ -159,6 +159,12 @@ public class GiraphJob {
   /** Default is to use RPC, not netty */
   public static final boolean USE_NETTY_DEFAULT = false;
 
+  /** Netty max connection failures */
+  public static final String NETTY_MAX_CONNECTION_FAILURES =
+      "giraph.nettyMaxConnectionFailures";
+  /** Default Netty max connection failures */
+  public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+
   /** Initial port to start using for the RPC communication */
   public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
   /** Default port to start using for the RPC communication */

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Wed Aug 15 20:07:32 2012
@@ -53,8 +53,10 @@ public class HashWorkerPartitioner<I ext
 
   @Override
   public PartitionOwner getPartitionOwner(I vertexId) {
-    return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
-      partitionOwnerList.size());
+    synchronized (partitionOwnerList) {
+      return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+          partitionOwnerList.size());
+    }
   }
 
   @Override
@@ -70,8 +72,10 @@ public class HashWorkerPartitioner<I ext
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
       Map<Integer, Partition<I, V, E, M>> partitionMap) {
-    partitionOwnerList.clear();
-    partitionOwnerList.addAll(masterSetPartitionOwners);
+    synchronized (partitionOwnerList) {
+      partitionOwnerList.clear();
+      partitionOwnerList.addAll(masterSetPartitionOwners);
+    }
 
     Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
     Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
@@ -111,6 +115,8 @@ public class HashWorkerPartitioner<I ext
 
   @Override
   public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return partitionOwnerList;
+    synchronized (partitionOwnerList) {
+      return partitionOwnerList;
+    }
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java Wed Aug 15 20:07:32 2012
@@ -48,9 +48,22 @@ public class TimedLogger {
    * @param msg Message to print
    */
   public void info(String msg) {
-    if (System.currentTimeMillis() > lastPrint + msecs) {
+    if (isPrintable()) {
       log.info(msg);
+    }
+  }
+
+  /**
+   * Is the log message printable (minimum interval met)?
+   *
+   * @return True if the message is printable
+   */
+  public boolean isPrintable() {
+    if (System.currentTimeMillis() > lastPrint + msecs) {
       lastPrint = System.currentTimeMillis();
+      return true;
     }
+
+    return false;
   }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1373609&r1=1373608&r2=1373609&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed Aug 15 20:07:32 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Sets;
+import java.util.Set;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.utils.MockUtils;
@@ -104,8 +106,10 @@ public class ConnectionTest {
     NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
         new NettyClient<IntWritable, IntWritable, IntWritable,
         IntWritable>(context);
-    List<InetSocketAddress> serverAddresses =
-        new ArrayList<InetSocketAddress>();
+    Set<InetSocketAddress> serverAddresses = Sets.newHashSet();
+    serverAddresses.add(server1.getMyAddress());
+    serverAddresses.add(server2.getMyAddress());
+    serverAddresses.add(server3.getMyAddress());
     client.connectAllAddresses(serverAddresses);
 
     client.stop();