You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/22 00:17:57 UTC

svn commit: r1375824 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/comm/

Author: aching
Date: Tue Aug 21 22:17:56 2012
New Revision: 1375824

URL: http://svn.apache.org/viewvc?rev=1375824&view=rev
Log:
GIRAPH-306: Netty requests should be reliable and implement exactly
once semantics. (aching)

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
      - copied, changed from r1375717, giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 21 22:17:56 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-306: Netty requests should be reliable and implement exactly
+  once semantics. (aching)
+
   GIRAPH-309: Message count is wrong. (aching via apresta)
 
   GIRAPH-246: Periodic worker calls to context.progress() will prevent

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/AddressRequestIdGenerator.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * Generate different request ids based on the address of the well known
+ * port on the workers
+ */
+public class AddressRequestIdGenerator {
+  /** Address request generator map */
+  private final Map<InetSocketAddress, Long> addressRequestGeneratorMap =
+      Maps.newHashMap();
+
+  /**
+   * Get the next request id for a given destination.  Not thread-safe.
+   *
+   * @param address Address of the worker (consistent during a superstep)
+   * @return Valid request id
+   */
+  public Long getNextRequestId(InetSocketAddress address) {
+    Long requestGenerator = addressRequestGeneratorMap.get(address);
+    if (requestGenerator == null) {
+      requestGenerator = Long.valueOf(0);
+      if (addressRequestGeneratorMap.put(address, requestGenerator) != null) {
+        throw new IllegalStateException("getNextRequestId: Illegal put for " +
+            "address " + address);
+      }
+      return requestGenerator;
+    }
+
+    requestGenerator = requestGenerator + 1;
+    addressRequestGeneratorMap.put(address, requestGenerator);
+    return requestGenerator;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java Tue Aug 21 22:17:56 2012
@@ -51,11 +51,29 @@ public class ChannelRotater {
       throw new IllegalArgumentException("nextChannel: No channels exist!");
     }
 
+    incrementIndex();
+    return channelList.get(index);
+  }
+
+  /**
+   * Remove the last channel that was given out
+   *
+   * @return Return the removed channel
+   */
+  public Channel removeLast() {
+    Channel channel = channelList.remove(index);
+    incrementIndex();
+    return channel;
+  }
+
+  /**
+   * Increment the channel index with wrapping
+   */
+  private void incrementIndex() {
     ++index;
     if (index >= channelList.size()) {
       index = 0;
     }
-    return channelList.get(index);
   }
 
   /**

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ClientRequestId.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+/**
+ * Simple immutable object to use for tracking requests uniquely.  This
+ * object is guaranteed to be unique for a given client (based on the
+ * destination worker and the request).
+ */
+public class ClientRequestId {
+  /** Destination worker id */
+  private final int destinationWorkerId;
+  /** Request id */
+  private final long requestId;
+
+  /**
+   * Constructor.
+   *
+   * @param destinationWorkerId Destination worker id
+   * @param requestId Request id
+   */
+  public ClientRequestId(int destinationWorkerId, long requestId) {
+    this.destinationWorkerId = destinationWorkerId;
+    this.requestId = requestId;
+  }
+
+  public int getDestinationWorkerId() {
+    return destinationWorkerId;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  @Override
+  public int hashCode() {
+    return (29 * destinationWorkerId) + (int) (57 * requestId);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof ClientRequestId) {
+      ClientRequestId otherObj = (ClientRequestId) other;
+      if (otherObj.getRequestId() == requestId &&
+          otherObj.getDestinationWorkerId() == destinationWorkerId) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "(destWorker=" + destinationWorkerId + ",reqId=" + requestId + ")";
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/IncreasingBitSet.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.BitSet;
+
+/**
+ * Bit set optimized for increasing longs to save storage space.
+ * The general idea is that even though some keys will be added out-of-order,
+ * there is a base key that keeps increasing so that the bit set doesn't get
+ * very big.  When there are enough set bits, the bit set gets compacted.
+ * Thread-safe.
+ */
+public class IncreasingBitSet {
+  /** Minimum number of bits to shift */
+  public static final int MIN_BITS_TO_SHIFT = 64 * 1024;
+  /** Bit set used */
+  private BitSet bitSet = new BitSet();
+  /** Last base key (all keys < this have been accepted */
+  private long lastBaseKey = 0;
+
+  /**
+   * Add a key if it is possible.
+   *
+   * @param key Key to add
+   * @return True if the key was added, false otherwise
+   */
+  public synchronized boolean add(long key) {
+    long remainder = key - lastBaseKey;
+    checkLegalKey(remainder);
+
+    if (remainder < 0) {
+      return false;
+    }
+    if (bitSet.get((int) remainder)) {
+      return false;
+    }
+    bitSet.set((int) remainder);
+    int nextClearBit = bitSet.nextClearBit(0);
+    if (nextClearBit >= MIN_BITS_TO_SHIFT) {
+      bitSet = bitSet.get(nextClearBit,
+          Math.max(nextClearBit, bitSet.length()));
+      lastBaseKey += nextClearBit;
+    }
+    return true;
+  }
+
+  /**
+   * Get the number of set bits
+   *
+   * @return Number of set bits
+   */
+  public synchronized long cardinality() {
+    long size = bitSet.cardinality();
+    return size + lastBaseKey;
+  }
+
+  /**
+   * Get the size of the bit set
+   *
+   * @return Size of the bit set
+   */
+  public synchronized int size() {
+    return bitSet.size();
+  }
+
+  /**
+   * Check for existence of a key
+   *
+   * @param key Key to check for
+   * @return True if the key exists, false otherwise
+   */
+  public synchronized boolean has(long key) {
+    long remainder = key - lastBaseKey;
+    checkLegalKey(remainder);
+
+    if (remainder < 0) {
+      return true;
+    }
+    return bitSet.get((int) remainder);
+  }
+
+  /**
+   * Get the last base key (mainly for debugging).
+   *
+   * @return Last base key
+   */
+  public synchronized long getLastBaseKey() {
+    return lastBaseKey;
+  }
+
+  /**
+   * Check the remainder for validity
+   *
+   * @param remainder Remainder to check
+   */
+  private void checkLegalKey(long remainder) {
+    if (remainder > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "checkLegalKey: Impossible that to add key " +
+          (remainder + lastBaseKey) + " with base " +
+          lastBaseKey + " since the " +
+          "spread is too large (> " + Integer.MAX_VALUE);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug 21 22:17:56 2012
@@ -21,11 +21,13 @@ package org.apache.giraph.comm;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -59,8 +61,6 @@ import org.jboss.netty.handler.codec.fra
 public class NettyClient<I extends WritableComparable,
     V extends Writable, E extends Writable,
     M extends Writable> {
-  /** Msecs to wait between waiting for all requests to finish */
-  public static final int WAITING_REQUEST_MSECS = 15000;
   /** Do we have a limit on number of open requests we can have */
   public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
       "giraph.waitForRequestsConfirmation";
@@ -87,10 +87,11 @@ public class NettyClient<I extends Writa
    */
   private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
       Maps.newHashMap();
-  /** Atomic request id, used in outstandingRequestMap */
-  private final AtomicInteger requestId = new AtomicInteger(0);
-  /** Outstanding request map (tracks all requests). */
-  private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
+  /**
+   * Request map of client request ids to request information.
+   */
+  private final ConcurrentMap<ClientRequestId, RequestInfo>
+  clientRequestIdRequestInfoMap;
   /** Number of channels per server */
   private final int channelsPerServer;
   /** Byte counter for this client */
@@ -99,15 +100,29 @@ public class NettyClient<I extends Writa
   private final int sendBufferSize;
   /** Receive buffer size */
   private final int receiveBufferSize;
-
   /** Do we have a limit on number of open requests */
   private final boolean limitNumberOfOpenRequests;
   /** Maximum number of requests without confirmation we can have */
   private final int maxNumberOfOpenRequests;
   /** Maximum number of connnection failures */
   private final int maxConnectionFailures;
+  /** Maximum number of milliseconds for a request */
+  private final int maxRequestMilliseconds;
+  /** Maximum number of reconnection failures */
+  private final int maxReconnectionFailures;
+  /** Waiting internal for checking outstanding requests msecs */
+  private final int waitingRequestMsecs;
   /** Timed logger for printing request debugging */
   private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
+  /** Boss factory service */
+  private final ExecutorService bossExecutorService;
+  /** Worker factory service */
+  private final ExecutorService workerExecutorService;
+  /** Address request id generator */
+  private final AddressRequestIdGenerator addressRequestIdGenerator =
+      new AddressRequestIdGenerator();
+  /** Client id */
+  private final int clientId;
 
   /**
    * Only constructor
@@ -116,7 +131,7 @@ public class NettyClient<I extends Writa
    */
   public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
-    Configuration conf = context.getConfiguration();
+    final Configuration conf = context.getConfiguration();
     this.channelsPerServer = conf.getInt(
         GiraphJob.CHANNELS_PER_SERVER,
         GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
@@ -140,20 +155,41 @@ public class NettyClient<I extends Writa
       maxNumberOfOpenRequests = -1;
     }
 
+    maxRequestMilliseconds = conf.getInt(
+        GiraphJob.MAX_REQUEST_MILLISECONDS,
+        GiraphJob.MAX_REQUEST_MILLISECONDS_DEFAULT);
+
     maxConnectionFailures = conf.getInt(
         GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
         GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
 
+    maxReconnectionFailures = conf.getInt(
+        GiraphJob.MAX_RECONNECT_ATTEMPTS,
+        GiraphJob.MAX_RECONNECT_ATTEMPTS_DEFAULT);
+
+    waitingRequestMsecs = conf.getInt(
+        GiraphJob.WAITING_REQUEST_MSECS,
+        GiraphJob.WAITING_REQUEST_MSECS_DEFAULT);
+
     int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
         NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
-    outstandingRequestMap =
+    clientRequestIdRequestInfoMap =
         new MapMaker().concurrencyLevel(maxThreads).makeMap();
 
+    bossExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "Giraph Client Netty Boss #%d").build());
+    workerExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "Giraph Client Netty Worker #%d").build());
+
+    clientId = conf.getInt("mapred.task.partition", -1);
+
     // Configure the client.
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(),
-            Executors.newCachedThreadPool(),
+            bossExecutorService,
+            workerExecutorService,
             maxThreads));
     bootstrap.setOption("connectTimeoutMillis",
         MAX_CONNECTION_MILLISECONDS_DEFAULT);
@@ -168,9 +204,9 @@ public class NettyClient<I extends Writa
       public ChannelPipeline getPipeline() throws Exception {
         return Channels.pipeline(
             byteCounter,
-            new FixedLengthFrameDecoder(9),
+            new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES),
             new RequestEncoder(),
-            new ResponseClientHandler(outstandingRequestMap));
+            new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
       }
     });
   }
@@ -241,7 +277,7 @@ public class NettyClient<I extends Writa
         if (!future.isSuccess()) {
           LOG.warn("connectAllAddresses: Future failed " +
               "to connect with " + waitingConnection.address + " with " +
-              failures + " failures and because of " + future.getCause());
+              failures + " failures because of " + future.getCause());
 
           ChannelFuture connectionFuture =
               bootstrap.connect(waitingConnection.address);
@@ -250,9 +286,9 @@ public class NettyClient<I extends Writa
           ++failures;
         } else {
           Channel channel = future.getChannel();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("connectAllAddresses: Connected to " +
-                channel.getRemoteAddress());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("connectAllAddresses: Connected to " +
+                channel.getRemoteAddress() + ", open = " + channel.isOpen());
           }
 
           if (channel.getRemoteAddress() == null) {
@@ -290,7 +326,7 @@ public class NettyClient<I extends Writa
    * Stop the client.
    */
   public void stop() {
-    // Close connections asyncronously, in a Netty-approved
+    // Close connections asynchronously, in a Netty-approved
     // way, without cleaning up thread pools until all channels
     // in addressChannelMap are closed (success or failure)
     int channelCount = 0;
@@ -305,12 +341,15 @@ public class NettyClient<I extends Writa
         result.addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture cf) {
+            context.progress();
             if (count.incrementAndGet() == done) {
               if (LOG.isInfoEnabled()) {
                 LOG.info("stop: reached wait threshold, " +
                     done + " connections closed, releasing " +
                     "NettyClient.bootstrap resources now.");
               }
+              bossExecutorService.shutdownNow();
+              workerExecutorService.shutdownNow();
               bootstrap.releaseExternalResources();
             }
           }
@@ -320,25 +359,81 @@ public class NettyClient<I extends Writa
   }
 
   /**
+   * Get the next available channel, reconnecting if necessary
+   *
+   * @param remoteServer Remote server to get a channel for
+   * @return Available channel for this remote server
+   */
+  private Channel getNextChannel(InetSocketAddress remoteServer) {
+    Channel channel = addressChannelMap.get(remoteServer).nextChannel();
+    if (channel == null) {
+      throw new IllegalStateException(
+          "getNextChannel: No channel exists for " + remoteServer);
+    }
+
+    // Return this channel if it is connected
+    if (channel.isConnected()) {
+      return channel;
+    }
+
+    // Get rid of the failed channel
+    addressChannelMap.get(remoteServer).removeLast();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("checkAndFixChannel: Fixing disconnected channel to " +
+          remoteServer + ", open = " + channel.isOpen() + ", " +
+          "bound = " + channel.isBound());
+    }
+    int reconnectFailures = 0;
+    while (reconnectFailures < maxConnectionFailures) {
+      ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
+      connectionFuture.awaitUninterruptibly();
+      if (connectionFuture.isSuccess()) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("checkAndFixChannel: Connected to " + remoteServer + "!");
+        }
+        addressChannelMap.get(remoteServer).addChannel(
+            connectionFuture.getChannel());
+        return connectionFuture.getChannel();
+      }
+      ++reconnectFailures;
+      LOG.warn("checkAndFixChannel: Failed to reconnect to " +  remoteServer +
+          " on attempt " + reconnectFailures + " out of " +
+          maxConnectionFailures + " max attempts, sleeping for 5 secs",
+          connectionFuture.getCause());
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        LOG.warn("blockingConnect: Unexpected interrupted exception", e);
+      }
+    }
+    throw new IllegalStateException("checkAndFixChannel: Failed to connect " +
+        "to " + remoteServer + " in " + reconnectFailures +
+        " connect attempts");
+  }
+
+  /**
    * Send a request to a remote server (should be already connected)
    *
+   * @param destWorkerId Destination worker id
    * @param remoteServer Server to send the request to
    * @param request Request to send
    */
-  public void sendWritableRequest(InetSocketAddress remoteServer,
+  public void sendWritableRequest(Integer destWorkerId,
+                                  InetSocketAddress remoteServer,
                                   WritableRequest<I, V, E, M> request) {
-    if (outstandingRequestMap.isEmpty()) {
+    if (clientRequestIdRequestInfoMap.isEmpty()) {
       byteCounter.resetAll();
     }
-    request.setRequestId(requestId.incrementAndGet());
-    Channel channel = addressChannelMap.get(remoteServer).nextChannel();
-    if (channel == null) {
-      throw new IllegalStateException(
-          "sendWritableRequest: No channel exists for " + remoteServer);
-    }
-    RequestInfo newRequestInfo = new RequestInfo(remoteServer);
-    RequestInfo oldRequestInfo = outstandingRequestMap.putIfAbsent(
-        request.getRequestId(), newRequestInfo);
+
+    Channel channel = getNextChannel(remoteServer);
+    request.setClientId(clientId);
+    request.setRequestId(
+        addressRequestIdGenerator.getNextRequestId(remoteServer));
+
+    RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
+    RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+        new ClientRequestId(destWorkerId, request.getRequestId()),
+            newRequestInfo);
     if (oldRequestInfo != null) {
       throw new IllegalStateException("sendWritableRequest: Impossible to " +
           "have a previous request id = " + request.getRequestId() + ", " +
@@ -348,7 +443,7 @@ public class NettyClient<I extends Writa
     newRequestInfo.setWriteFuture(writeFuture);
 
     if (limitNumberOfOpenRequests &&
-        outstandingRequestMap.size() > maxNumberOfOpenRequests) {
+        clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
       waitSomeRequests(maxNumberOfOpenRequests);
     }
   }
@@ -367,37 +462,91 @@ public class NettyClient<I extends Writa
   }
 
   /**
-   * Ensure that at most maxOpenRequests are not complete
+   * Ensure that at most maxOpenRequests are not complete.  Periodically,
+   * check the state of every request.  If we find the connection failed,
+   * re-establish it and re-send the request.
    *
    * @param maxOpenRequests Maximum number of requests which can be not
    *                        complete
    */
   private void waitSomeRequests(int maxOpenRequests) {
-    synchronized (outstandingRequestMap) {
-      while (outstandingRequestMap.size() > maxOpenRequests) {
-        if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
-          LOG.info("waitSomeRequests: Waiting interval of " +
-              WAITING_REQUEST_MSECS + " msecs, " +
-              outstandingRequestMap.size() +
-              " open requests, waiting for it to be <= " + maxOpenRequests +
-              ", " + byteCounter.getMetrics());
-
-          if (outstandingRequestMap.size() < MAX_REQUESTS_TO_LIST) {
-            for (Map.Entry<Long, RequestInfo> entry :
-                outstandingRequestMap.entrySet()) {
-              LOG.info("waitSomeRequests: Waiting for request " +
-                  entry.getKey() + " - " + entry.getValue());
-            }
+    List<ClientRequestId> addedRequestIds = Lists.newArrayList();
+    List<RequestInfo<I, V, E, M>> addedRequestInfos =
+        Lists.newArrayList();
+
+    while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
+      // Wait for requests to complete for some time
+      if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
+        LOG.info("waitSomeRequests: Waiting interval of " +
+            waitingRequestMsecs + " msecs, " +
+            clientRequestIdRequestInfoMap.size() +
+            " open requests, waiting for it to be <= " + maxOpenRequests +
+            ", " + byteCounter.getMetrics());
+
+        if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+          for (Map.Entry<ClientRequestId, RequestInfo> entry :
+              clientRequestIdRequestInfoMap.entrySet()) {
+            LOG.info("waitSomeRequests: Waiting for request " +
+                entry.getKey() + " - " + entry.getValue());
           }
         }
+      }
+      synchronized (clientRequestIdRequestInfoMap) {
         try {
-          outstandingRequestMap.wait(WAITING_REQUEST_MSECS);
+          clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
         } catch (InterruptedException e) {
           LOG.error("waitFutures: Got unexpected InterruptedException", e);
         }
-        // Make sure that waiting doesn't kill the job
-        context.progress();
       }
+      // Make sure that waiting doesn't kill the job
+      context.progress();
+
+      // Check all the requests for problems
+      for (Map.Entry<ClientRequestId, RequestInfo> entry :
+          clientRequestIdRequestInfoMap.entrySet()) {
+        RequestInfo requestInfo = entry.getValue();
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore, request failed, or the request is taking
+        // too long, re-establish and resend
+        if (!writeFuture.getChannel().isConnected() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+          LOG.warn("waitSomeRequests: Problem with request id " +
+              entry.getKey() + " connected = " +
+              writeFuture.getChannel().isConnected() +
+              ", future done = " + writeFuture.isDone() + ", " +
+              "success = " + writeFuture.isSuccess() + ", " +
+              "cause = " + writeFuture.getCause() + ", " +
+              "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
+              "destination = " + writeFuture.getChannel().getRemoteAddress() +
+              " " + requestInfo);
+          addedRequestIds.add(entry.getKey());
+          addedRequestInfos.add(new RequestInfo<I, V, E, M>(
+              requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+        }
+      }
+
+      // Add any new requests to the system, connect if necessary, and re-send
+      for (int i = 0; i < addedRequestIds.size(); ++i) {
+        ClientRequestId requestId = addedRequestIds.get(i);
+        RequestInfo<I, V, E, M> requestInfo = addedRequestInfos.get(i);
+
+        if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
+            null) {
+          LOG.warn("waitSomeRequests: Request " + requestId +
+              " completed prior to sending the next request");
+          clientRequestIdRequestInfoMap.remove(requestId);
+        }
+        InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
+        Channel channel = getNextChannel(remoteServer);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("waitSomeRequests: Re-issuing request " + requestInfo);
+        }
+        ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+        requestInfo.setWriteFuture(writeFuture);
+      }
+      addedRequestIds.clear();
+      addedRequestInfos.clear();
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug 21 22:17:56 2012
@@ -21,10 +21,9 @@ package org.apache.giraph.comm;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.graph.GiraphJob;
@@ -60,8 +59,6 @@ public class NettyServer<I extends Writa
      M extends Writable> {
   /** Default maximum thread pool size */
   public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
-  /** Default TCP backlog */
-  public static final int TCP_BACKLOG_DEFAULT = 100;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
@@ -78,6 +75,8 @@ public class NettyServer<I extends Writa
   private InetSocketAddress myAddress;
   /** Maximum number of threads */
   private final int maximumPoolSize;
+  /** TCP backlog */
+  private final int tcpBacklog;
   /** Request reqistry */
   private final RequestRegistry requestRegistry = new RequestRegistry();
   /** Server data */
@@ -90,6 +89,12 @@ public class NettyServer<I extends Writa
   private final int sendBufferSize;
   /** Receive buffer size */
   private final int receiveBufferSize;
+  /** Boss factory service */
+  private final ExecutorService bossExecutorService;
+  /** Worker factory service */
+  private final ExecutorService workerExecutorService;
+  /** Request completed map per worker */
+  private final WorkerRequestReservedMap workerRequestReservedMap;
 
   /**
    * Constructor for creating the server
@@ -115,12 +120,15 @@ public class NettyServer<I extends Writa
     receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
         GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
 
-    ThreadFactory bossFactory = new ThreadFactoryBuilder()
-      .setNameFormat("Giraph Netty Boss #%d")
-      .build();
-    ThreadFactory workerFactory = new ThreadFactoryBuilder()
-      .setNameFormat("Giraph Netty Worker #%d")
-      .build();
+    workerRequestReservedMap = new WorkerRequestReservedMap(conf);
+
+    bossExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "Giraph Server Netty Boss #%d").build());
+    workerExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "Giraph Server Netty Worker #%d").build());
+
     try {
       this.localHostname = InetAddress.getLocalHost().getHostName();
     } catch (UnknownHostException e) {
@@ -129,9 +137,12 @@ public class NettyServer<I extends Writa
     maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
                                   MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
 
+    tcpBacklog = conf.getInt(GiraphJob.TCP_BACKLOG,
+        conf.getInt(GiraphJob.MAX_WORKERS, GiraphJob.TCP_BACKLOG_DEFAULT));
+
     channelFactory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory),
+        bossExecutorService,
+        workerExecutorService,
         maximumPoolSize);
   }
 
@@ -145,7 +156,7 @@ public class NettyServer<I extends Writa
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.sendBufferSize", sendBufferSize);
     bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
-    bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT);
+    bootstrap.setOption("backlog", tcpBacklog);
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
@@ -153,7 +164,8 @@ public class NettyServer<I extends Writa
             byteCounter,
             new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
             new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter),
-            new RequestServerHandler<I, V, E, M>(serverData));
+            new RequestServerHandler<I, V, E, M>(serverData,
+                workerRequestReservedMap, conf));
       }
     });
 
@@ -225,7 +237,9 @@ public class NettyServer<I extends Writa
     if (LOG.isInfoEnabled()) {
       LOG.info("stop: Halting netty server");
     }
-    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    accepted.close().awaitUninterruptibly();
+    bossExecutorService.shutdownNow();
+    workerExecutorService.shutdownNow();
     bootstrap.releaseExternalResources();
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Tue Aug 21 22:17:56 2012
@@ -84,6 +84,8 @@ public class NettyWorkerClient<I extends
   private final int maxMessagesPerPartition;
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
+  /** Maximum number of attempts to resolve an address*/
+  private final int maxResolveAddressAttempts;
   /** Messages sent during the last superstep */
   private long totalMsgsSentInSuperstep = 0;
   /** Server data from the server */
@@ -106,6 +108,9 @@ public class NettyWorkerClient<I extends
         GiraphJob.MSG_SIZE_DEFAULT);
     maxMutationsPerPartition = conf.getInt(GiraphJob.MAX_MUTATIONS_PER_REQUEST,
         GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+    maxResolveAddressAttempts = conf.getInt(
+        GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+        GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
     sendMessageCache = new SendMessageCache<I, M>(conf);
     sendMutationsCache = new SendMutationsCache<I, V, E, M>();
     this.serverData = serverData;
@@ -135,7 +140,13 @@ public class NettyWorkerClient<I extends
         partitionIndexAddressMap.remove(
             partitionOwner.getPartitionId());
       }
-      addresses.add(partitionOwner.getWorkerInfo().getHostnamePort());
+
+      // No need to connect to myself
+      if (service.getWorkerInfo().getPartitionId() !=
+          partitionOwner.getWorkerInfo().getPartitionId()) {
+        addresses.add(getInetSocketAddress(partitionOwner.getWorkerInfo(),
+            partitionOwner.getPartitionId()));
+      }
     }
     nettyClient.connectAllAddresses(addresses);
   }
@@ -152,7 +163,25 @@ public class NettyWorkerClient<I extends
     InetSocketAddress address =
         partitionIndexAddressMap.get(partitionId);
     if (address == null) {
-      address = workerInfo.getHostnamePort();
+      address = workerInfo.getInetSocketAddress();
+      int resolveAttempts = 0;
+      while (address.isUnresolved() &&
+          resolveAttempts < maxResolveAddressAttempts) {
+        address = workerInfo.getInetSocketAddress();
+        ++resolveAttempts;
+        LOG.warn("getInetSocketAddress: Failed to resolve " + address +
+            " on attempt " + resolveAttempts + " of " +
+            maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          LOG.warn("getInetSocketAddress: Interrupted.", e);
+        }
+      }
+      if (resolveAttempts >= maxResolveAddressAttempts) {
+        throw new IllegalStateException("getInetSocketAddress: Coudldn't " +
+            "resolve " + address + " in " +  resolveAttempts + " tries.");
+      }
       partitionIndexAddressMap.put(partitionId, address);
     }
 
@@ -160,32 +189,22 @@ public class NettyWorkerClient<I extends
   }
 
   /**
-   * Fill the socket address cache for the partition owner.
-   *
-   * @param destVertex vertex to be sent
-   * @return address of the vertex range server containing this vertex
-   */
-  private InetSocketAddress getInetSocketAddress(I destVertex) {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(destVertex);
-    return getInetSocketAddress(partitionOwner.getWorkerInfo(),
-        partitionOwner.getPartitionId());
-  }
-
-  /**
    * When doing the request, short circuit if it is local
    *
+   * @param workerInfo Worker info
    * @param remoteServerAddress Remote server address (checked against local)
    * @param writableRequest Request to either submit or run locally
    */
-  private void doRequest(InetSocketAddress remoteServerAddress,
+  private void doRequest(WorkerInfo workerInfo,
+                         InetSocketAddress remoteServerAddress,
                          WritableRequest<I, V, E, M> writableRequest) {
     // If this is local, execute locally
-    if (service.getWorkerInfo().getHostnamePort().equals(
-        remoteServerAddress)) {
+    if (service.getWorkerInfo().getPartitionId() ==
+        workerInfo.getPartitionId()) {
       writableRequest.doRequest(serverData);
     } else {
-      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+      nettyClient.sendWritableRequest(
+          workerInfo.getPartitionId(), remoteServerAddress, writableRequest);
     }
   }
 
@@ -213,7 +232,8 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMessagesRequest<I, V, E, M>(
               partitionId, partitionMessages);
-      doRequest(remoteServerAddress, writableRequest);
+      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+          writableRequest);
     }
   }
 
@@ -231,9 +251,9 @@ public class NettyWorkerClient<I extends
     WritableRequest<I, V, E, M> vertexRequest =
         new SendVertexRequest<I, V, E, M>(partitionId,
             partition.getVertices());
-    doRequest(remoteServerAddress, vertexRequest);
+    doRequest(workerInfo, remoteServerAddress, vertexRequest);
 
-    // messages are stored separately
+    // Messages are stored separately
     MessageStoreByPartition<I, M> messageStore =
         service.getServerData().getCurrentMessageStore();
     Map<I, Collection<M>> map = Maps.newHashMap();
@@ -251,7 +271,7 @@ public class NettyWorkerClient<I extends
       if (messagesInMap > maxMessagesPerPartition) {
         WritableRequest<I, V, E, M> messagesRequest = new
             SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-        doRequest(remoteServerAddress, messagesRequest);
+        doRequest(workerInfo, remoteServerAddress, messagesRequest);
         map.clear();
         messagesInMap = 0;
       }
@@ -259,7 +279,7 @@ public class NettyWorkerClient<I extends
     if (!map.isEmpty()) {
       WritableRequest<I, V, E, M> messagesRequest = new
           SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-      doRequest(remoteServerAddress, messagesRequest);
+      doRequest(workerInfo, remoteServerAddress, messagesRequest);
     }
   }
 
@@ -283,7 +303,8 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               partitionId, partitionMutations);
-      doRequest(remoteServerAddress, writableRequest);
+      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+          writableRequest);
     }
   }
 
@@ -372,9 +393,14 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMessagesRequest<I, V, E, M>(
               entry.getKey(), entry.getValue());
+      PartitionOwner partitionOwner =
+          service.getVertexPartitionOwner(
+              entry.getValue().keySet().iterator().next());
       InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(entry.getValue().keySet().iterator().next());
-      doRequest(remoteServerAddress, writableRequest);
+          getInetSocketAddress(partitionOwner.getWorkerInfo(),
+              partitionOwner.getPartitionId());
+      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+          writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)
@@ -385,9 +411,14 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               entry.getKey(), entry.getValue());
+      PartitionOwner partitionOwner =
+          service.getVertexPartitionOwner(
+              entry.getValue().keySet().iterator().next());
       InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(entry.getValue().keySet().iterator().next());
-      doRequest(remoteServerAddress, writableRequest);
+          getInetSocketAddress(partitionOwner.getWorkerInfo(),
+              partitionOwner.getPartitionId());
+      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+          writableRequest);
     }
 
     nettyClient.waitAllRequests();

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java Tue Aug 21 22:17:56 2012
@@ -40,7 +40,7 @@ import org.jboss.netty.handler.codec.one
 @SuppressWarnings("rawtypes")
 public class RequestDecoder<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable>  extends OneToOneDecoder {
+    M extends Writable> extends OneToOneDecoder {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(RequestDecoder.class);
@@ -58,8 +58,9 @@ public class RequestDecoder<I extends Wr
    * @param requestRegistry Request registry
    * @param byteCounter Keeps track of the decoded bytes
    */
-  public RequestDecoder(Configuration conf, RequestRegistry requestRegistry,
-                        ByteCounter byteCounter) {
+  public RequestDecoder(
+      Configuration conf, RequestRegistry requestRegistry,
+      ByteCounter byteCounter) {
     this.conf = conf;
     this.requestRegistry = requestRegistry;
     this.byteCounter = byteCounter;

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestInfo.java Tue Aug 21 22:17:56 2012
@@ -20,16 +20,26 @@ package org.apache.giraph.comm;
 
 import java.net.InetSocketAddress;
 import java.util.Date;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.jboss.netty.channel.ChannelFuture;
 
 /**
  * Help track requests throughout the system
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
-public class RequestInfo {
+public class RequestInfo<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
   /** Destination of the request */
   private final InetSocketAddress destinationAddress;
   /** When the request was started */
   private final long startedMsecs;
+  /** Request */
+  private final WritableRequest<I, V, E, M> request;
   /** Future of the write of this request*/
   private volatile ChannelFuture writeFuture;
 
@@ -37,13 +47,16 @@ public class RequestInfo {
    * Constructor.
    *
    * @param destinationAddress Destination of the request
+   * @param request Request that is sent
    */
-  public RequestInfo(InetSocketAddress destinationAddress) {
+  public RequestInfo(InetSocketAddress destinationAddress,
+                     WritableRequest<I, V, E, M> request) {
     this.destinationAddress = destinationAddress;
+    this.request = request;
     this.startedMsecs = System.currentTimeMillis();
   }
 
-  public InetSocketAddress getAddress() {
+  public InetSocketAddress getDestinationAddress() {
     return destinationAddress;
   }
 
@@ -60,6 +73,10 @@ public class RequestInfo {
     return System.currentTimeMillis() - startedMsecs;
   }
 
+  public WritableRequest<I, V, E, M> getRequest() {
+    return request;
+  }
+
   public void setWriteFuture(ChannelFuture writeFuture) {
     this.writeFuture = writeFuture;
   }
@@ -72,7 +89,7 @@ public class RequestInfo {
   public String toString() {
     return "(destAddr=" + destinationAddress +
         ",startDate=" + new Date(startedMsecs) + ",elapsedMsecs=" +
-        getElapsedMsecs() +
+        getElapsedMsecs() + ",reqId=" + request.getRequestId() +
         ((writeFuture == null) ? ")" :
             ",writeDone=" + writeFuture.isDone() +
                 ",writeSuccess=" + writeFuture.isSuccess() + ")");

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java Tue Aug 21 22:17:56 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -41,19 +43,39 @@ import org.jboss.netty.channel.SimpleCha
 public class RequestServerHandler<I extends WritableComparable,
     V extends Writable, E extends Writable,
     M extends Writable> extends SimpleChannelUpstreamHandler {
+  /** Number of bytes in the encoded response */
+  public static final int RESPONSE_BYTES = 13;
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(RequestServerHandler.class);
+  /** Already closed first request? */
+  private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+  /** Close connection on first request (used for simulating failure) */
+  private final boolean closeFirstRequest;
   /** Data that can be accessed for handling requests */
   private final ServerData<I, V, E, M> serverData;
+  /** Request reserved map (for exactly one semantics) */
+  private final WorkerRequestReservedMap workerRequestReservedMap;
+  /** My worker id */
+  private final int myWorkerId;
 
   /**
    * Constructor with external server data
    *
    * @param serverData Data held by the server
+   * @param workerRequestReservedMap Worker request reservation map
+   * @param conf Configuration
    */
-  public RequestServerHandler(ServerData<I, V, E, M> serverData) {
+  public RequestServerHandler(
+      ServerData<I, V, E, M> serverData,
+      WorkerRequestReservedMap workerRequestReservedMap,
+      Configuration conf) {
     this.serverData = serverData;
+    this.workerRequestReservedMap = workerRequestReservedMap;
+    closeFirstRequest = conf.getBoolean(
+        GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+        GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+    myWorkerId = conf.getInt("mapred.task.partition", -1);
   }
 
   @Override
@@ -62,15 +84,41 @@ public class RequestServerHandler<I exte
     if (LOG.isDebugEnabled()) {
       LOG.debug("messageReceived: Got " + e.getMessage().getClass());
     }
+
     @SuppressWarnings("unchecked")
     WritableRequest<I, V, E, M> writableRequest =
         (WritableRequest<I, V, E, M>) e.getMessage();
-    writableRequest.doRequest(serverData);
 
-    // Send the success response with the request id
-    ChannelBuffer buffer = ChannelBuffers.directBuffer(9);
+    // Simulate a closed connection on the first request (if desired)
+    if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
+      LOG.info("messageReceived: Simulating closing channel on first " +
+          "request " + writableRequest.getRequestId() + " from " +
+          writableRequest.getClientId());
+      ALREADY_CLOSED_FIRST_REQUEST = true;
+      ctx.getChannel().close();
+      return;
+    }
+
+    // Only execute this request exactly once
+    int alreadyDone = 1;
+    if (workerRequestReservedMap.reserveRequest(
+        writableRequest.getClientId(),
+        writableRequest.getRequestId())) {
+      writableRequest.doRequest(serverData);
+      alreadyDone = 0;
+    } else {
+      LOG.info("messageReceived: Request id " +
+          writableRequest.getRequestId() + " from client " +
+          writableRequest.getClientId() +
+          " was already processed, " +
+          "not processing again.");
+    }
+
+    // Send the response with the request id
+    ChannelBuffer buffer = ChannelBuffers.directBuffer(RESPONSE_BYTES);
+    buffer.writeInt(myWorkerId);
     buffer.writeLong(writableRequest.getRequestId());
-    buffer.writeByte(0);
+    buffer.writeByte(alreadyDone);
     e.getChannel().write(buffer);
   }
 
@@ -88,13 +136,14 @@ public class RequestServerHandler<I exte
                             ChannelStateEvent e) throws Exception {
     if (LOG.isDebugEnabled()) {
       LOG.debug("channelClosed: Closed the channel on " +
-          ctx.getChannel().getRemoteAddress());
+          ctx.getChannel().getRemoteAddress() + " with event " +
+          e);
     }
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    throw new IllegalStateException("exceptionCaught: Channel failed with " +
+    LOG.warn("exceptionCaught: Channel failed with " +
         "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java Tue Aug 21 22:17:56 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -37,17 +39,29 @@ public class ResponseClientHandler exten
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ResponseClientHandler.class);
-  /** Outstanding request map */
-  private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
+  /** Already dropped first response? (used if dropFirstResponse == true) */
+  private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
+  /** Drop first response (used for simulating failure) */
+  private final boolean dropFirstResponse;
+  /** Outstanding worker request map */
+  private final ConcurrentMap<ClientRequestId, RequestInfo>
+  workerIdOutstandingRequestMap;
 
   /**
    * Constructor.
    *
-   * @param outstandingRequestMap Map of outstanding requests
+   * @param workerIdOutstandingRequestMap Map of worker ids to outstanding
+   *                                      requests
+   * @param conf Configuration
    */
   public ResponseClientHandler(
-      ConcurrentMap<Long, RequestInfo> outstandingRequestMap) {
-    this.outstandingRequestMap = outstandingRequestMap;
+      ConcurrentMap<ClientRequestId, RequestInfo>
+          workerIdOutstandingRequestMap,
+      Configuration conf) {
+    this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
+    dropFirstResponse = conf.getBoolean(
+        GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
+        GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
   }
 
   @Override
@@ -57,11 +71,14 @@ public class ResponseClientHandler exten
       throw new IllegalStateException("messageReceived: Got a " +
           "non-ChannelBuffer message " + event.getMessage());
     }
+
     ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
     ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+    int senderId = -1;
     long requestId = -1;
     int response = -1;
     try {
+      senderId = inputStream.readInt();
       requestId = inputStream.readLong();
       response = inputStream.readByte();
       inputStream.close();
@@ -69,12 +86,27 @@ public class ResponseClientHandler exten
       throw new IllegalStateException(
           "messageReceived: Got IOException ", e);
     }
-    if (response != 0) {
+
+    // Simulate a failed response on the first response (if desired)
+    if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
+      LOG.info("messageReceived: Simulating dropped response " + response +
+          " for request " + requestId);
+      ALREADY_DROPPED_FIRST_RESPONSE = true;
+      synchronized (workerIdOutstandingRequestMap) {
+        workerIdOutstandingRequestMap.notifyAll();
+      }
+      return;
+    }
+
+    if (response == 1) {
+      LOG.info("messageReceived: Already completed request " + requestId);
+    } else if (response != 0) {
       throw new IllegalStateException(
           "messageReceived: Got illegal response " + response);
     }
 
-    RequestInfo requestInfo = outstandingRequestMap.remove(requestId);
+    RequestInfo requestInfo = workerIdOutstandingRequestMap.remove(
+        new ClientRequestId(senderId, requestId));
     if (requestInfo == null) {
       throw new IllegalStateException("messageReceived: Impossible to " +
           "have a non-registered requestId " + requestId);
@@ -82,14 +114,14 @@ public class ResponseClientHandler exten
       if (LOG.isDebugEnabled()) {
         LOG.debug("messageReceived: Processed request id = " + requestId +
             " " + requestInfo + ".  Waiting on " +
-            outstandingRequestMap.size() +
+            workerIdOutstandingRequestMap.size() +
             " requests, bytes = " + buffer.capacity());
       }
     }
 
     // Help NettyClient#waitSomeRequests() to finish faster
-    synchronized (outstandingRequestMap) {
-      outstandingRequestMap.notifyAll();
+    synchronized (workerIdOutstandingRequestMap) {
+      workerIdOutstandingRequestMap.notifyAll();
     }
   }
 
@@ -104,7 +136,7 @@ public class ResponseClientHandler exten
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    throw new IllegalStateException("exceptionCaught: Channel failed with " +
+    LOG.warn("exceptionCaught: Channel failed with " +
         "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
   }
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerRequestReservedMap.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides a thread-safe map for checking worker and request id pairs
+ */
+public class WorkerRequestReservedMap {
+  /** Map of the worker ids to the requests received (bit set) */
+  private final ConcurrentMap<Integer, IncreasingBitSet>
+  workerRequestReservedMap;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public WorkerRequestReservedMap(Configuration conf) {
+    workerRequestReservedMap = new MapMaker().concurrencyLevel(
+        conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+            NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
+  }
+
+  /**
+   * Reserve the request (before the request starts to insure that it is
+   * only executed once).  We are assuming no failure on the server.
+   *
+   * @param workerId workerId of the request
+   * @param requestId Request id
+   * @return True if the reserving succeeded, false otherwise
+   */
+  public boolean reserveRequest(Integer workerId, long requestId) {
+    IncreasingBitSet requestSet = getRequestSet(workerId);
+    return requestSet.add(requestId);
+  }
+
+  /**
+   * Get and create the entry as necessary to get the request bit set.
+   *
+   * @param workerId Id of the worker to get the bit set for
+   * @return Bit set for the worker
+   */
+  private IncreasingBitSet getRequestSet(Integer workerId) {
+    IncreasingBitSet requestSet = workerRequestReservedMap.get(workerId);
+    if (requestSet == null) {
+      requestSet = new IncreasingBitSet();
+      IncreasingBitSet previous =
+          workerRequestReservedMap.putIfAbsent(workerId, requestSet);
+      if (previous != null) {
+        requestSet = previous;
+      }
+    }
+    return requestSet;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Tue Aug 21 22:17:56 2012
@@ -41,9 +41,19 @@ public abstract class WritableRequest<I 
     M extends Writable> implements Writable, Configurable {
   /** Configuration */
   private Configuration conf;
+  /** Client id */
+  private int clientId = -1;
   /** Request id */
   private long requestId = -1;
 
+  public int getClientId() {
+    return clientId;
+  }
+
+  public void setClientId(int clientId) {
+    this.clientId = clientId;
+  }
+
   public long getRequestId() {
     return requestId;
   }
@@ -92,12 +102,14 @@ public abstract class WritableRequest<I 
 
   @Override
   public final void readFields(DataInput input) throws IOException {
+    clientId = input.readInt();
     requestId = input.readLong();
     readFieldsRequest(input);
   }
 
   @Override
   public final void write(DataOutput output) throws IOException {
+    output.writeInt(clientId);
     output.writeLong(requestId);
     writeRequest(output);
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Aug 21 22:17:56 2012
@@ -1356,9 +1356,9 @@ public class BspServiceMaster<I extends 
         break;
       }
 
-      // Wait for a signal or no more than 60 seconds to progress
+      // Wait for a signal or no more than 30 seconds to progress
       // or else will continue.
-      event.waitMsecs(60 * 1000);
+      event.waitMsecs(30 * 1000);
       event.reset();
       getContext().progress();
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Aug 21 22:17:56 2012
@@ -985,7 +985,7 @@ public class BspServiceWorker<I extends 
 
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
-          " , mesages = " + workerSentMessages + " " +
+          ", mesages = " + workerSentMessages + " " +
           MemoryUtils.getRuntimeMemoryStats());
     }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug 21 22:17:56 2012
@@ -159,6 +159,52 @@ public class GiraphJob {
   /** Default is to use RPC, not netty */
   public static final boolean USE_NETTY_DEFAULT = false;
 
+  /** TCP backlog (defaults to number of workers) */
+  public static final String TCP_BACKLOG = "giraph.tcpBacklog";
+  /**
+   * Default TCP backlog default if the number of workers is not specified
+   * (i.e unittests)
+   */
+  public static final int TCP_BACKLOG_DEFAULT = 1;
+
+  /** Netty simulate a first request closed */
+  public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+      "giraph.nettySimulateFirstRequestClosed";
+  /** Default of not simulating failure for first request */
+  public static final boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT =
+      false;
+
+  /** Netty simulate a first response failed */
+  public static final String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+      "giraph.nettySimulateFirstResponseFailed";
+  /** Default of not simulating failure for first reponse */
+  public static final boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT =
+      false;
+
+  /** Maximum number of reconnection attempts */
+  public static final String MAX_RECONNECT_ATTEMPTS =
+      "giraph.maxNumberOfOpenRequests";
+  /** Default maximum number of reconnection attempts */
+  public static final int MAX_RECONNECT_ATTEMPTS_DEFAULT = 10;
+
+  /** Max resolve address attempts */
+  public static final String MAX_RESOLVE_ADDRESS_ATTEMPTS =
+      "giraph.maxResolveAddressAttempts";
+  /** Default max resolve address attempts */
+  public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+
+  /** Msecs to wait between waiting for all requests to finish */
+  public static final String WAITING_REQUEST_MSECS =
+      "giraph.waitingRequestMsecs";
+  /** Default msecs to wait between waiting for all requests to finish */
+  public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+
+  /** Milliseconds for a request to complete (or else resend) */
+  public static final String MAX_REQUEST_MILLISECONDS =
+      "giraph.maxRequestMilliseconds";
+  /** Maximum number of milliseconds for a request to complete */
+  public static final int MAX_REQUEST_MILLISECONDS_DEFAULT = 60 * 1000;
+
   /** Netty max connection failures */
   public static final String NETTY_MAX_CONNECTION_FAILURES =
       "giraph.nettyMaxConnectionFailures";

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Tue Aug 21 22:17:56 2012
@@ -70,7 +70,12 @@ public class WorkerInfo implements Writa
     return hostnameId;
   }
 
-  public InetSocketAddress getHostnamePort() {
+  /**
+   * Get a new instance of the InetSocketAddress for this hostname and port
+   *
+   * @return InetSocketAddress of the hostname and port.
+   */
+  public InetSocketAddress getInetSocketAddress() {
     return new InetSocketAddress(hostname, port);
   }
 

Added: giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java?rev=1375824&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java Tue Aug 21 22:17:56 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test IncreasingBitSetTest
+ */
+public class IncreasingBitSetTest {
+  @Test
+  public void add256kIntegers() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 0; i < 256 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <=
+          IncreasingBitSet.MIN_BITS_TO_SHIFT);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+  }
+
+  @Test
+  public void add256kIntegersAlternate() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 0; i < 256 * 1024; i += 2) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertFalse(IncreasingBitSet.has(i + 1));
+      assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+    }
+    assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+    for (int i = 1; i < 256 * 1024; i += 2) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.has(i - 1));
+      assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+  }
+
+  @Test
+  public void add256kIntegersOutOfOrder() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 128 * 1024; i < 256 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+    }
+    assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+    for (int i = 0; i < 128 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+    assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+  }
+}

Copied: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (from r1375717, giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?p2=giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java&p1=giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java&r1=1375717&r2=1375824&rev=1375824&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Aug 21 22:17:56 2012
@@ -18,12 +18,17 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -34,25 +39,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
- * Test all the different netty requests.
+ * Test all the netty failure scenarios
  */
-public class RequestTest {
+public class RequestFailureTest {
   /** Configuration */
   private Configuration conf;
   /** Server data */
@@ -64,6 +57,8 @@ public class RequestTest {
   /** Client */
   private NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
   client;
+  /** Mock context */
+  private Context context;
 
   /**
    * Only for testing.
@@ -89,69 +84,12 @@ public class RequestTest {
     conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
         IntWritable.class, Writable.class);
 
-    @SuppressWarnings("rawtypes")
-    Context context = mock(Context.class);
+    context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
-
-    // Start the service
-    serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
-    server =
-        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf, serverData);
-    server.start();
-    client =
-        new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
-            (context);
-    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
-  }
-
-  @Test
-  public void sendVertexPartition() throws IOException {
-    // Data to send
-    int partitionId = 13;
-    Collection<Vertex<IntWritable, IntWritable, IntWritable,
-        IntWritable>> vertices =
-        new ArrayList<Vertex<IntWritable, IntWritable,
-        IntWritable, IntWritable>>();
-    for (int i = 0; i < 10; ++i) {
-      TestVertex vertex = new TestVertex();
-      vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
-      vertices.add(vertex);
-    }
-
-    // Send the request
-    SendVertexRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendVertexRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertices);
-    client.sendWritableRequest(server.getMyAddress(), request);
-    client.waitAllRequests();
-
-    // Stop the service
-    client.stop();
-    server.stop();
-
-    // Check the output
-    Map<Integer, Collection<Vertex<IntWritable, IntWritable,
-    IntWritable, IntWritable>>> partitionVertexMap =
-        serverData.getPartitionVertexMap();
-    synchronized (partitionVertexMap) {
-      assertTrue(partitionVertexMap.containsKey(partitionId));
-      int total = 0;
-      for (Vertex<IntWritable, IntWritable,
-          IntWritable, IntWritable> vertex :
-            (partitionVertexMap.get(partitionId))) {
-        total += vertex.getId().get();
-      }
-      assertEquals(total, 45);
-    }
   }
 
-  @Test
-  public void sendPartitionMessagesRequest() throws IOException {
+  private WritableRequest<IntWritable, IntWritable, IntWritable,
+      IntWritable> getRequest() {
     // Data to send
     int partitionId = 17;
     Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
@@ -167,16 +105,13 @@ public class RequestTest {
 
     // Send the request
     SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendPartitionMessagesRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertexIdMessages);
-    client.sendWritableRequest(server.getMyAddress(), request);
-    client.waitAllRequests();
-
-    // Stop the service
-    client.stop();
-    server.stop();
+        IntWritable> request =
+        new SendPartitionMessagesRequest<IntWritable, IntWritable,
+            IntWritable, IntWritable>(partitionId, vertexIdMessages);
+    return request;
+  }
 
+  private void checkResult(int numRequests) throws IOException {
     // Check the output
     Iterable<IntWritable> vertices =
         serverData.getIncomingMessageStore().getDestinationVertices();
@@ -193,78 +128,119 @@ public class RequestTest {
       }
     }
     assertEquals(21, keySum);
-    assertEquals(35, messageSum);
+    assertEquals(35 * numRequests, messageSum);
   }
 
   @Test
-  public void sendPartitionMutationsRequest() throws IOException {
-    // Data to send
-    int partitionId = 19;
-    Map<IntWritable, VertexMutations<IntWritable, IntWritable,
-    IntWritable, IntWritable>> vertexIdMutations =
-        Maps.newHashMap();
-    for (int i = 0; i < 11; ++i) {
-      VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
-      mutations =
-      new VertexMutations<IntWritable, IntWritable,
-      IntWritable, IntWritable>();
-      for (int j = 0; j < 3; ++j) {
-        TestVertex vertex = new TestVertex();
-        vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
-        mutations.addVertex(vertex);
-      }
-      for (int j = 0; j < 2; ++j) {
-        mutations.removeVertex();
-      }
-      for (int j = 0; j < 5; ++j) {
-        Edge<IntWritable, IntWritable> edge =
-            new Edge<IntWritable, IntWritable>(
-                new IntWritable(i), new IntWritable(2*j));
-        mutations.addEdge(edge);
-      }
-      for (int j = 0; j < 7; ++j) {
-        mutations.removeEdge(new IntWritable(j));
-      }
-      vertexIdMutations.put(new IntWritable(i), mutations);
-    }
+  public void send2Requests() throws IOException {
+    // Start the service
+    serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+    server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+    client =
+        new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+            (context);
+    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
 
-    // Send the request
-    SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendPartitionMutationsRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertexIdMutations);
-    client.sendWritableRequest(server.getMyAddress(), request);
+    // Send the request 2x
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request1 = getRequest();
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request2 = getRequest();
+    client.sendWritableRequest(-1, server.getMyAddress(), request1);
+    client.sendWritableRequest(-1, server.getMyAddress(), request2);
     client.waitAllRequests();
 
     // Stop the service
     client.stop();
     server.stop();
 
-    // Check the output
-    ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
-    IntWritable, IntWritable>> inVertexIdMutations =
-        serverData.getVertexMutations();
-    int keySum = 0;
-    for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
-        IntWritable, IntWritable>> entry :
-          inVertexIdMutations.entrySet()) {
-      synchronized (entry.getValue()) {
-        keySum += entry.getKey().get();
-        int vertexValueSum = 0;
-        for (Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
-        vertex : entry.getValue().getAddedVertexList()) {
-          vertexValueSum += vertex.getValue().get();
-        }
-        assertEquals(3, vertexValueSum);
-        assertEquals(2, entry.getValue().getRemovedVertexCount());
-        int removeEdgeValueSum = 0;
-        for (Edge<IntWritable, IntWritable> edge :
-          entry.getValue().getAddedEdgeList()) {
-          removeEdgeValueSum += edge.getValue().get();
-        }
-        assertEquals(20, removeEdgeValueSum);
-      }
-    }
-    assertEquals(55, keySum);
+    // Check the output (should have been only processed once)
+    checkResult(2);
+  }
+
+  @Test
+  public void alreadyProcessedRequest() throws IOException {
+    // Force a drop of the first request
+    conf.setBoolean(GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED, true);
+    // One second to finish a request
+    conf.setInt(GiraphJob.MAX_REQUEST_MILLISECONDS, 1000);
+    // Loop every 2 seconds
+    conf.setInt(GiraphJob.WAITING_REQUEST_MSECS, 2000);
+
+    // Start the service
+    serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+    server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+    client =
+        new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+            (context);
+    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+
+    // Send the request 2x, but should only be processed once
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request1 = getRequest();
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request2 = getRequest();
+    client.sendWritableRequest(-1, server.getMyAddress(), request1);
+    client.sendWritableRequest(-1, server.getMyAddress(), request2);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output (should have been only processed once)
+    checkResult(2);
+  }
+
+  @Test
+  public void resendRequest() throws IOException {
+    // Force a drop of the first request
+    conf.setBoolean(GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED, true);
+    // One second to finish a request
+    conf.setInt(GiraphJob.MAX_REQUEST_MILLISECONDS, 1000);
+    // Loop every 2 seconds
+    conf.setInt(GiraphJob.WAITING_REQUEST_MSECS, 2000);
+
+    // Start the service
+    serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+    server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+    client =
+        new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+            (context);
+    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
+
+    // Send the request 2x, but should only be processed once
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request1 = getRequest();
+    WritableRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request2 = getRequest();
+    client.sendWritableRequest(-1, server.getMyAddress(), request1);
+    client.sendWritableRequest(-1, server.getMyAddress(), request2);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output (should have been only processed once)
+    checkResult(2);
   }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1375824&r1=1375823&r2=1375824&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug 21 22:17:56 2012
@@ -127,7 +127,7 @@ public class RequestTest {
     IntWritable> request =
       new SendVertexRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertices);
-    client.sendWritableRequest(server.getMyAddress(), request);
+    client.sendWritableRequest(-1, server.getMyAddress(), request);
     client.waitAllRequests();
 
     // Stop the service
@@ -170,7 +170,7 @@ public class RequestTest {
     IntWritable> request =
       new SendPartitionMessagesRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertexIdMessages);
-    client.sendWritableRequest(server.getMyAddress(), request);
+    client.sendWritableRequest(-1, server.getMyAddress(), request);
     client.waitAllRequests();
 
     // Stop the service
@@ -233,7 +233,7 @@ public class RequestTest {
     IntWritable> request =
       new SendPartitionMutationsRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertexIdMutations);
-    client.sendWritableRequest(server.getMyAddress(), request);
+    client.sendWritableRequest(-1, server.getMyAddress(), request);
     client.waitAllRequests();
 
     // Stop the service