You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/17 06:33:18 UTC

svn commit: r1399090 [1/3] - in /giraph/trunk: ./ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/s...

Author: aching
Date: Wed Oct 17 04:33:16 2012
New Revision: 1399090

URL: http://svn.apache.org/viewvc?rev=1399090&view=rev
Log:
GIRAPH-374: Multithreading in input split loading and compute
(aching).

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
      - copied, changed from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
      - copied, changed from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java
Removed:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 17 04:33:16 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-374: Multithreading in input split loading and compute (aching).
+
   GIRAPH-375: Cleaner MutableVertex API (apresta)
 
   GIRAPH-371: Replace BspUtils in giraph-formats-contrib for
@@ -8,7 +10,8 @@ Release 0.2.0 - unreleased
 
   GIRAPH-369: bin/giraph broken (Nitay Joffe via ereisman)
 
-  GIRAPH-368: HBase Vertex I/O formats handle setConf() internally (bfem via ereisman)
+  GIRAPH-368: HBase Vertex I/O formats handle setConf() internally
+  (bfem via ereisman)
 
   GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)
 

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java Wed Oct 17 04:33:16 2012
@@ -48,19 +48,18 @@ import java.util.List;
  *
  * Works with {@link HBaseVertexOutputFormat}
  *
- *
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public abstract  class HBaseVertexInputFormat<
-          I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends VertexInputFormat<I, V, E, M>  {
+public abstract class HBaseVertexInputFormat<
+    I extends WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable>
+    extends VertexInputFormat<I, V, E, M>  {
 
 
    /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Oct 17 04:33:16 2012
@@ -105,6 +105,11 @@ public class GiraphConfiguration extends
   /** Default log level is INFO (same as Hadoop) */
   public static final String LOG_LEVEL_DEFAULT = "info";
 
+  /** Use thread level debugging? */
+  public static final String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
+  /** Default to not use thread-level debugging */
+  public static final boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+
   /**
    * Minimum percent of the maximum number of workers that have responded
    * in order to continue progressing. (float)
@@ -343,6 +348,17 @@ public class GiraphConfiguration extends
   public static final String MSG_NUM_FLUSH_THREADS =
       "giraph.msgNumFlushThreads";
 
+  /** Number of threads for vertex computation */
+  public static final String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
+  /** Default number of threads for vertex computation */
+  public static final int NUM_COMPUTE_THREADS_DEFAULT = 1;
+
+  /** Number of threads for input splits loading */
+  public static final String NUM_INPUT_SPLITS_THREADS =
+      "giraph.numInputSplitsThreads";
+  /** Default number of threads for input splits loading */
+  public static final int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+
   /** Number of poll attempts prior to failing the job (int) */
   public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
   /** Default poll attempts */
@@ -748,6 +764,15 @@ public class GiraphConfiguration extends
     return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
   }
 
+  /**
+   * Use the log thread layout option?
+   *
+   * @return True if use the log thread layout option, false otherwise
+   */
+  public boolean useLogThreadLayout() {
+    return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT);
+  }
+
   public boolean getLocalTestMode() {
     return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
   }
@@ -844,4 +869,34 @@ public class GiraphConfiguration extends
   public boolean authenticate() {
     return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
   }
+
+  /**
+   * Set the number of compute threads
+   *
+   * @param numComputeThreads Number of compute threads to use
+   */
+  public void setNumComputeThreads(int numComputeThreads) {
+    setInt(NUM_COMPUTE_THREADS, numComputeThreads);
+  }
+
+  public int getNumComputeThreads() {
+    return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT);
+  }
+
+  /**
+   * Set the number of input split threads
+   *
+   * @param numInputSplitsThreads Number of input split threads to use
+   */
+  public void setNumInputSplitsThreads(int numInputSplitsThreads) {
+    setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads);
+  }
+
+  public int getNumInputSplitsThreads() {
+    return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT);
+  }
+
+  public long getInputSplitMaxVertices() {
+    return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java Wed Oct 17 04:33:16 2012
@@ -35,10 +35,7 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public interface CentralizedService<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Setup (must be called prior to any other function)
-   */
-  void setup();
+
 
   /**
    * Get the current global superstep of the application to work on.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Wed Oct 17 04:33:16 2012
@@ -39,6 +39,11 @@ public interface CentralizedServiceMaste
     V extends Writable, E extends Writable, M extends Writable> extends
     CentralizedService<I, V, E, M> {
   /**
+   * Setup (must be called prior to any other function)
+   */
+  void setup();
+
+  /**
    * Become the master.
    * @return true if became the master, false if the application is done.
    */

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct 17 04:33:16 2012
@@ -23,6 +23,10 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.graph.WorkerAggregatorUsage;
 import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.hadoop.io.Writable;
@@ -50,6 +54,13 @@ public interface CentralizedServiceWorke
   V extends Writable, E extends Writable, M extends Writable>
   extends CentralizedService<I, V, E, M> {
   /**
+   * Setup (must be called prior to any other function)
+   *
+   * @return Finished superstep stats for the input superstep
+   */
+  FinishedSuperstepStats setup();
+
+  /**
    * Get the worker information
    *
    * @return Worker information
@@ -57,6 +68,15 @@ public interface CentralizedServiceWorke
   WorkerInfo getWorkerInfo();
 
   /**
+   * Get the worker client (for instantiating WorkerClientRequestProcessor
+   * instances.
+   *
+   * @return Worker client
+   */
+  WorkerClient<I, V, E, M> getWorkerClient();
+
+  /**
+   * Get the worker context.
    *
    * @return worker's WorkerContext
    */
@@ -92,27 +112,33 @@ public interface CentralizedServiceWorke
    * appropriate superstep.
    *
    * @param superstep which checkpoint to use
+   * @return Graph-wide vertex and edge counts
    * @throws IOException
    */
-  void loadCheckpoint(long superstep) throws IOException;
+  VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
 
   /**
    * Take all steps prior to actually beginning the computation of a
    * superstep.
    *
+   * @param graphState Current graph state
    * @return Collection of all the partition owners from the master for this
    *         superstep.
    */
-  Collection<? extends PartitionOwner> startSuperstep();
+  Collection<? extends PartitionOwner> startSuperstep(
+      GraphState<I, V, E, M> graphState);
 
   /**
    * Worker is done with its portion of the superstep.  Report the
    * worker level statistics after the computation.
    *
+   * @param graphState Current graph state
    * @param partitionStatsList All the partition stats for this worker
-   * @return true if this is the last superstep, false otherwise
+   * @return Stats of the superstep completion
    */
-  boolean finishSuperstep(List<PartitionStats> partitionStatsList);
+  FinishedSuperstepStats finishSuperstep(
+      GraphState<I, V, E, M> graphState,
+      List<PartitionStats> partitionStatsList);
 
   /**
    * Get the partition that a vertex id would belong to.
@@ -158,7 +184,7 @@ public interface CentralizedServiceWorke
 
   /**
    * If desired by the user, vertex partitions are redistributed among
-   * workers according to the chosen {@link WorkerGraphPartitioner}.
+   * workers according to the chosen WorkerGraphPartitioner.
    *
    * @param masterSetPartitionOwners Partition owner info passed from the
    *        master.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Wed Oct 17 04:33:16 2012
@@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Aggregates the messages to be send to workers so they can be sent
- * in bulk.
+ * in bulk.  Not thread-safe.
  *
  * @param <I> Vertex id
  * @param <M> Message data
@@ -98,7 +98,7 @@ public class SendMessageCache<I extends 
     // Add the message
     final int originalMessageCount = messages.size();
     messages.add(message);
-    if (combiner != null) {
+    if (combiner != null && originalMessageCount > 0) {
       try {
         messages = Lists.newArrayList(combiner.combine(destVertexId, messages));
       } catch (IOException e) {

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GiraphTransferRegulator;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+/**
+ * Caches partition vertices prior to sending.  Aggregating these requests
+ * will make larger, more efficient requests.  Not thread-safe.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class SendPartitionCache<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendPartitionCache.class);
+  /** Input split vertex cache (only used when loading from input split) */
+  private final Map<PartitionOwner, Partition<I, V, E, M>>
+  ownerPartitionMap = Maps.newHashMap();
+  /** Number of messages in each partition */
+  private final Map<PartitionOwner, Integer> messageCountMap =
+      Maps.newHashMap();
+  /** Context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /**
+   *  Regulates the size of outgoing Collections of vertices read
+   * by the local worker during INPUT_SUPERSTEP that are to be
+   * transfered from <code>inputSplitCache</code> to the owner
+   * of their initial, master-assigned Partition.*
+   */
+  private final GiraphTransferRegulator transferRegulator;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param configuration Configuration
+   */
+  public SendPartitionCache(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    this.context = context;
+    this.configuration = configuration;
+    transferRegulator =
+        new GiraphTransferRegulator(configuration);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("SendPartitionCache: maxVerticesPerTransfer = " +
+          transferRegulator.getMaxVerticesPerTransfer());
+      LOG.info("SendPartitionCache: maxEdgesPerTransfer = " +
+          transferRegulator.getMaxEdgesPerTransfer());
+    }
+  }
+
+  /**
+   * Add a vertex to the cache, returning the partition if full
+   *
+   * @param partitionOwner Partition owner of the vertex
+   * @param vertex Vertex to add
+   * @return A partition to send or null, if requirements are not met
+   */
+  public Partition<I, V, E, M> addVertex(PartitionOwner partitionOwner,
+                                         Vertex<I, V, E, M> vertex) {
+    Partition<I, V, E, M> partition =
+        ownerPartitionMap.get(partitionOwner);
+    if (partition == null) {
+      partition = new Partition<I, V, E, M>(
+          configuration,
+          partitionOwner.getPartitionId(),
+          context);
+      ownerPartitionMap.put(partitionOwner, partition);
+    }
+    transferRegulator.incrementCounters(partitionOwner, vertex);
+
+    Vertex<I, V, E, M> oldVertex =
+        partition.putVertex(vertex);
+    if (oldVertex != null) {
+      LOG.warn("addVertex: Replacing vertex " + oldVertex +
+          " with " + vertex);
+    }
+
+    // Requirements met to transfer?
+    if (transferRegulator.transferThisPartition(partitionOwner)) {
+      return ownerPartitionMap.remove(partitionOwner);
+    }
+
+    return null;
+  }
+
+  /**
+   * Get the owner partition map (for flushing)
+   *
+   * @return Owner partition map
+   */
+  public Map<PartitionOwner, Partition<I, V, E, M>> getOwnerPartitionMap() {
+    return ownerPartitionMap;
+  }
+
+  /**
+   * Clear the cache.
+   */
+  public void clear() {
+    ownerPartitionMap.clear();
+  }
+}
+

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 17 04:33:16 2012
@@ -18,11 +18,11 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import org.apache.giraph.comm.requests.WritableRequest;
 
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -30,7 +30,8 @@ import org.apache.hadoop.io.WritableComp
 import java.io.IOException;
 
 /**
- * Public interface for workers to do message communication
+ * Public interface for workers to establish connections and send aggregated
+ * requests.
  *
  * @param <I> Vertex id
  * @param <V> Vertex value
@@ -57,84 +58,47 @@ else[HADOOP_NON_SECURE]*/
 /*end[HADOOP_NON_SECURE]*/
 
   /**
-   * Fix changes to the workers and the mapping between partitions and
-   * workers.
-   */
-  void fixPartitionIdToSocketAddrMap();
-
-  /**
    * Lookup PartitionOwner for a vertex.
+   *
    * @param vertexId id to look up.
    * @return PartitionOwner holding the vertex.
    */
   PartitionOwner getVertexPartitionOwner(I vertexId);
 
   /**
-   * Sends a message to destination vertex.
-   *
-   * @param destVertexId Destination vertex id.
-   * @param message Message to send.
-   */
-  void sendMessageRequest(I destVertexId, M message);
-
-  /**
-   * Sends a partition to the appropriate partition owner
-   *
-   * @param workerInfo Owner the vertices belong to
-   * @param partition Partition to send
-   */
-  void sendPartitionRequest(WorkerInfo workerInfo,
-                            Partition<I, V, E, M> partition);
-
-  /**
-   * Sends a request to the appropriate vertex range owner to add an edge
+   * Make sure that all the connections to the partitions owners have been
+   * established.
    *
-   * @param vertexIndex Index of the vertex to get the request
-   * @param edge Edge to be added
-   * @throws IOException
-   */
-  void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
-
-  /**
-   * Sends a request to the appropriate vertex range owner to remove an edge
-   *
-   * @param vertexIndex Index of the vertex to get the request
-   * @param destinationVertexIndex Index of the edge to be removed
-   * @throws IOException
-   */
-  void removeEdgeRequest(I vertexIndex, I destinationVertexIndex)
-    throws IOException;
-
-  /**
-   * Sends a request to the appropriate vertex range owner to add a vertex
-   *
-   * @param vertex Vertex to be added
-   * @throws IOException
+   * @param partitionOwners Partition owners to establish/check connections
    */
-  void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException;
+  void openConnections(
+      Collection<? extends PartitionOwner> partitionOwners);
 
   /**
-   * Sends a request to the appropriate vertex range owner to remove a vertex
+   * Fill the socket address cache for the worker info and its partition.
    *
-   * @param vertexIndex Index of the vertex to be removed
-   * @throws IOException
+   * @param workerInfo Worker information to get the socket address
+   * @param partitionId Partition id to look up.
+   * @return address of the vertex range server containing this vertex
    */
-  void removeVertexRequest(I vertexIndex) throws IOException;
+  InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+                                         int partitionId);
 
   /**
-   * Flush all outgoing messages.  This will synchronously ensure that all
-   * messages have been send and delivered prior to returning.
+   * Send a request to a remote server (should be already connected)
    *
-   * @throws IOException
+   * @param destWorkerId Destination worker id
+   * @param remoteServer Server to send the request to
+   * @param request Request to send
    */
-  void flush() throws IOException;
+  void sendWritableRequest(Integer destWorkerId,
+                                  InetSocketAddress remoteServer,
+                                  WritableRequest request);
 
   /**
-   * Get the messages sent during this superstep and clear them.
-   *
-   * @return Number of messages sent before the reset.
+   * Wait until all the outstanding requests are completed.
    */
-  long resetMessageCount();
+  void waitAllRequests();
 
   /**
    * Closes all connections.

Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java (from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java&r1=1399043&r2=1399090&rev=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java Wed Oct 17 04:33:16 2012
@@ -15,60 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.Vertex;
+import java.io.IOException;
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
-
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-
 /**
- * Public interface for workers to do message communication
+ * Aggregates IPC requests and sends them off
  *
- * @param <I> Vertex id
+ * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
  * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public interface WorkerClient<I extends WritableComparable,
+public interface WorkerClientRequestProcessor<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
-
-  /**
-   *  Setup the client.
-   */
-/*if[HADOOP_NON_SECURE]
-  void setup();
-else[HADOOP_NON_SECURE]*/
-  /**
-   * Setup the client.
-   *
-   * @param authenticate whether to SASL authenticate with server or not:
-   * set by giraph.authenticate configuration option.
-   */
-  void setup(boolean authenticate);
-/*end[HADOOP_NON_SECURE]*/
-
-  /**
-   * Fix changes to the workers and the mapping between partitions and
-   * workers.
-   */
-  void fixPartitionIdToSocketAddrMap();
-
-  /**
-   * Lookup PartitionOwner for a vertex.
-   * @param vertexId id to look up.
-   * @return PartitionOwner holding the vertex.
-   */
-  PartitionOwner getVertexPartitionOwner(I vertexId);
-
   /**
    * Sends a message to destination vertex.
    *
@@ -78,9 +45,18 @@ else[HADOOP_NON_SECURE]*/
   void sendMessageRequest(I destVertexId, M message);
 
   /**
-   * Sends a partition to the appropriate partition owner
+   * Sends a vertex to the appropriate partition owner
+   *
+   * @param partitionOwner Owner of the vertex
+   * @param vertex Vertex to send
+   */
+  void sendVertexRequest(PartitionOwner partitionOwner,
+                         Vertex<I, V, E, M> vertex);
+
+  /**
+   * Send a partition request (no batching).
    *
-   * @param workerInfo Owner the vertices belong to
+   * @param workerInfo Worker to send the partition to
    * @param partition Partition to send
    */
   void sendPartitionRequest(WorkerInfo workerInfo,
@@ -91,7 +67,7 @@ else[HADOOP_NON_SECURE]*/
    *
    * @param vertexIndex Index of the vertex to get the request
    * @param edge Edge to be added
-   * @throws IOException
+   * @throws java.io.IOException
    */
   void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
 
@@ -122,8 +98,8 @@ else[HADOOP_NON_SECURE]*/
   void removeVertexRequest(I vertexIndex) throws IOException;
 
   /**
-   * Flush all outgoing messages.  This will synchronously ensure that all
-   * messages have been send and delivered prior to returning.
+   * Flush all outgoing messages.  This ensures that all the messages have bee
+   * sent, but not guaranteed to have been delivered yet.
    *
    * @throws IOException
    */
@@ -137,19 +113,10 @@ else[HADOOP_NON_SECURE]*/
   long resetMessageCount();
 
   /**
-   * Closes all connections.
-   *
-   * @throws IOException
-   */
-  void closeConnections() throws IOException;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-  /**
-   * Authenticates, as client, with another BSP worker, as server.
+   * Lookup PartitionOwner for a vertex.
    *
-   * @throws IOException
+   * @param vertexId id to look up.
+   * @return PartitionOwner holding the vertex.
    */
-  void authenticate() throws IOException;
-/*end[HADOOP_NON_SECURE]*/
+  PartitionOwner getVertexPartitionOwner(I vertexId);
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Wed Oct 17 04:33:16 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -45,8 +46,10 @@ public interface WorkerServer<I extends 
   /**
    * Move the in transition messages to the in messages for every vertex and
    * add new connections to any newly appearing IPC proxies.
+   *
+   * @param graphState Current graph state
    */
-  void prepareSuperstep();
+  void prepareSuperstep(GraphState<I, V, E, M> graphState);
 
   /**
    * Get server data

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java Wed Oct 17 04:33:16 2012
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.comm.netty;
 
-import java.util.Collection;
 import java.util.List;
 import com.google.common.collect.Lists;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 
 /**
- * Maintains multiple channels and rotates between them
+ * Maintains multiple channels and rotates between them.  This is thread-safe.
  */
 public class ChannelRotater {
   /** Index of last used channel */
@@ -52,8 +53,10 @@ public class ChannelRotater {
    *
    * @param channel Channel to add
    */
-  public void addChannel(Channel channel) {
-    channelList.add(channel);
+  public synchronized void addChannel(Channel channel) {
+    synchronized (channelList) {
+      channelList.add(channel);
+    }
   }
 
   /**
@@ -61,42 +64,62 @@ public class ChannelRotater {
    *
    * @return Next channel
    */
-  public Channel nextChannel() {
+  public synchronized Channel nextChannel() {
     if (channelList.isEmpty()) {
       throw new IllegalArgumentException("nextChannel: No channels exist!");
     }
 
-    incrementIndex();
+    ++index;
+    if (index >= channelList.size()) {
+      index = 0;
+    }
     return channelList.get(index);
   }
 
   /**
-   * Remove the last channel that was given out
+   * Remove the a channel
    *
-   * @return Return the removed channel
+   * @param channel Channel to remove
+   * @return Return true if successful, false otherwise
    */
-  public Channel removeLast() {
-    Channel channel = channelList.remove(index);
-    incrementIndex();
-    return channel;
+  public synchronized boolean removeChannel(Channel channel) {
+    boolean success = channelList.remove(channel);
+    if (index >= channelList.size()) {
+      index = 0;
+    }
+    return success;
   }
 
   /**
-   * Increment the channel index with wrapping
+   * Get the number of channels in this object
+   *
+   * @return Number of channels
    */
-  private void incrementIndex() {
-    ++index;
-    if (index >= channelList.size()) {
-      index = 0;
+  public synchronized int size() {
+    return channelList.size();
+  }
+
+  /**
+   * Close the channels
+   *
+   * @param channelFutureListener If desired, pass in a channel future listener
+   */
+  public synchronized void closeChannels(
+      ChannelFutureListener channelFutureListener) {
+    for (Channel channel : channelList) {
+      ChannelFuture channelFuture = channel.close();
+      if (channelFutureListener != null) {
+        channelFuture.addListener(channelFutureListener);
+      }
     }
   }
 
   /**
-   * Get the channels
+   * Get a copy of the channels
    *
-   * @return Collection of the channels
+   * @return Copy of the channels
    */
-  Collection<Channel> getChannels() {
-    return channelList;
+  public synchronized Iterable<Channel> getChannels() {
+    return Lists.newArrayList(channelList);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 17 04:33:16 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm.netty;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
@@ -30,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.giraph.GiraphConfiguration;
@@ -52,7 +52,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelConfig;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelLocal;
@@ -67,7 +66,7 @@ import org.jboss.netty.handler.execution
 import static org.jboss.netty.channel.Channels.pipeline;
 
 /**
- * Netty client for sending requests.
+ * Netty client for sending requests.  Thread-safe.
  */
 public class NettyClient {
   /** Do we have a limit on number of open requests we can have */
@@ -100,8 +99,8 @@ else[HADOOP_NON_SECURE]*/
    * Map of the peer connections, mapping from remote socket address to client
    * meta data
    */
-  private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
-      Maps.newHashMap();
+  private final ConcurrentMap<InetSocketAddress, ChannelRotater>
+  addressChannelMap = new MapMaker().makeMap();
   /**
    * Request map of client request ids to request information.
    */
@@ -214,7 +213,9 @@ else[HADOOP_NON_SECURE]*/
           GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
       executionHandler = new ExecutionHandler(
           new MemoryAwareThreadPoolExecutor(
-              executionThreads, 1048576, 1048576));
+              executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+              new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d")
+                  .build()));
       if (LOG.isInfoEnabled()) {
         LOG.info("NettyClient: Using execution handler with " +
             executionThreads + " threads after " +
@@ -226,10 +227,10 @@ else[HADOOP_NON_SECURE]*/
 
     bossExecutorService = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat(
-            "Giraph Client Netty Boss #%d").build());
+            "netty-client-boss-%d").build());
     workerExecutorService = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat(
-            "Giraph Client Netty Worker #%d").build());
+            "netty-client-worker-%d").build());
 
     clientId = conf.getInt("mapred.task.partition", -1);
 
@@ -332,6 +333,12 @@ else[HADOOP_NON_SECURE]*/
       this.address = address;
       this.taskId = taskId;
     }
+
+    @Override
+    public String toString() {
+      return "(future=" + future + ",address=" + address + ",taskId=" +
+          taskId + ")";
+    }
   }
 
   /**
@@ -403,8 +410,13 @@ else[HADOOP_NON_SECURE]*/
           ChannelRotater rotater =
               addressChannelMap.get(waitingConnection.address);
           if (rotater == null) {
-            rotater = new ChannelRotater(waitingConnection.taskId);
-            addressChannelMap.put(waitingConnection.address, rotater);
+            ChannelRotater newRotater =
+                new ChannelRotater(waitingConnection.taskId);
+            rotater = addressChannelMap.putIfAbsent(
+                waitingConnection.address, newRotater);
+            if (rotater == null) {
+              rotater = newRotater;
+            }
           }
           rotater.addChannel(future.getChannel());
           ++connected;
@@ -515,30 +527,27 @@ else[HADOOP_NON_SECURE]*/
     // in addressChannelMap are closed (success or failure)
     int channelCount = 0;
     for (ChannelRotater channelRotater : addressChannelMap.values()) {
-      channelCount += channelRotater.getChannels().size();
+      channelCount += channelRotater.size();
     }
     final int done = channelCount;
     final AtomicInteger count = new AtomicInteger(0);
     for (ChannelRotater channelRotater : addressChannelMap.values()) {
-      for (Channel channel : channelRotater.getChannels()) {
-        ChannelFuture result = channel.close();
-        result.addListener(new ChannelFutureListener() {
-          @Override
-          public void operationComplete(ChannelFuture cf) {
-            context.progress();
-            if (count.incrementAndGet() == done) {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("stop: reached wait threshold, " +
-                    done + " connections closed, releasing " +
-                    "NettyClient.bootstrap resources now.");
-              }
-              bossExecutorService.shutdownNow();
-              workerExecutorService.shutdownNow();
-              bootstrap.releaseExternalResources();
+      channelRotater.closeChannels(new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture cf) {
+          context.progress();
+          if (count.incrementAndGet() == done) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("stop: reached wait threshold, " +
+                  done + " connections closed, releasing " +
+                  "NettyClient.bootstrap resources now.");
             }
+            bossExecutorService.shutdownNow();
+            workerExecutorService.shutdownNow();
+            bootstrap.releaseExternalResources();
           }
-        });
-      }
+        }
+      });
     }
   }
 
@@ -561,7 +570,10 @@ else[HADOOP_NON_SECURE]*/
     }
 
     // Get rid of the failed channel
-    addressChannelMap.get(remoteServer).removeLast();
+    if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
+      LOG.warn("getNextChannel: Unlikely event that the channel " +
+          channel + " was already removed!");
+    }
     if (LOG.isInfoEnabled()) {
       LOG.info("getNextChannel: Fixing disconnected channel to " +
           remoteServer + ", open = " + channel.isOpen() + ", " +
@@ -744,15 +756,4 @@ else[HADOOP_NON_SECURE]*/
       addedRequestInfos.clear();
     }
   }
-
-  /**
-   * Returning configuration of the first channel.
-   * @throws ArrayIndexOutOfBoundsException if no
-   *   channels exist in the client's address => channel map.
-   * @return ChannelConfig for the first channel (if any).
-   */
-  public ChannelConfig getChannelConfig() {
-    return ((Channel) addressChannelMap.values().toArray()[0]).getConfig();
-  }
-
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Oct 17 04:33:16 2012
@@ -36,6 +36,7 @@ import org.apache.giraph.comm.netty.hand
 else[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.netty.handler.SaslServerHandler;
 /*end[HADOOP_NON_SECURE]*/
+import java.util.concurrent.TimeUnit;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.log4j.Logger;
@@ -141,10 +142,10 @@ else[HADOOP_NON_SECURE]*/
 
     bossExecutorService = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat(
-            "Giraph Server Netty Boss #%d").build());
+            "netty-server-boss-%d").build());
     workerExecutorService = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat(
-            "Giraph Server Netty Worker #%d").build());
+            "netty-server-worker-%d").build());
 
     try {
       this.localHostname = InetAddress.getLocalHost().getHostName();
@@ -175,7 +176,9 @@ else[HADOOP_NON_SECURE]*/
       int executionThreads = conf.getNettyServerExecutionThreads();
       executionHandler = new ExecutionHandler(
           new MemoryAwareThreadPoolExecutor(
-              executionThreads, 1048576, 1048576));
+              executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+              new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d").
+                  build()));
       if (LOG.isInfoEnabled()) {
         LOG.info("NettyServer: Using execution handler with " +
             executionThreads + " threads after " +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 17 04:33:16 2012
@@ -18,25 +18,13 @@
 
 package org.apache.giraph.comm.netty;
 
+import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.SendMessageCache;
-import org.apache.giraph.comm.SendMutationsCache;
-import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
-import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
-import org.apache.giraph.comm.requests.SendVertexRequest;
-import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -48,9 +36,7 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -66,11 +52,10 @@ import java.util.concurrent.ConcurrentHa
 public class NettyWorkerClient<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     WorkerClient<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-    Logger.getLogger(NettyWorkerClient.class);
   /** Signal for getInetSocketAddress() to use WorkerInfo's address */
-  private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
+  public static final int NO_PARTITION_ID = Integer.MIN_VALUE;
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Netty client that does that actual I/O */
@@ -80,26 +65,11 @@ public class NettyWorkerClient<I extends
   /**
    * Cached map of partition ids to remote socket address.
    */
-  private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
+  private final ConcurrentMap<Integer, InetSocketAddress>
+  partitionIndexAddressMap =
       new ConcurrentHashMap<Integer, InetSocketAddress>();
-  /**
-   * Cached map of partitions to vertex indices to messages
-   */
-  private final SendMessageCache<I, M> sendMessageCache;
-  /**
-   * Cached map of partitions to vertex indices to mutations
-   */
-  private final SendMutationsCache<I, V, E, M> sendMutationsCache;
-  /** Maximum number of messages per remote worker to cache before sending */
-  private final int maxMessagesPerWorker;
-  /** Maximum number of mutations per partition before sending */
-  private final int maxMutationsPerPartition;
   /** Maximum number of attempts to resolve an address*/
   private final int maxResolveAddressAttempts;
-  /** Messages sent during the last superstep */
-  private long totalMsgsSentInSuperstep = 0;
-  /** Server data from the server */
-  private final ServerData<I, V, E, M> serverData;
 
   /**
    * Only constructor.
@@ -107,37 +77,32 @@ public class NettyWorkerClient<I extends
    * @param context Context from mapper
    * @param configuration Configuration
    * @param service Used to get partition mapping
-   * @param serverData Server data (used for local requests)
    */
   public NettyWorkerClient(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      CentralizedServiceWorker<I, V, E, M> service,
-      ServerData<I, V, E, M> serverData) {
+      CentralizedServiceWorker<I, V, E, M> service) {
     this.nettyClient = new NettyClient(context, configuration);
     this.conf = configuration;
     this.service = service;
-    maxMessagesPerWorker = conf.getInt(
-        GiraphConfiguration.MSG_SIZE,
-        GiraphConfiguration.MSG_SIZE_DEFAULT);
-    maxMutationsPerPartition = conf.getInt(
-        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
-        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+
     maxResolveAddressAttempts = conf.getInt(
         GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
         GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
-    sendMessageCache = new SendMessageCache<I, M>(conf);
-    sendMutationsCache = new SendMutationsCache<I, V, E, M>();
-    this.serverData = serverData;
+  }
+
+  public CentralizedServiceWorker<I, V, E, M> getService() {
+    return service;
   }
 
   @Override
-  public void fixPartitionIdToSocketAddrMap() {
+  public void openConnections(
+      Collection<? extends PartitionOwner> partitionOwners) {
     // 1. Fix all the cached inet addresses (remove all changed entries)
     // 2. Connect to any new IPC servers
     Map<InetSocketAddress, Integer> addressTaskIdMap =
-        Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
-    for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
+        Maps.newHashMapWithExpectedSize(partitionOwners.size());
+    for (PartitionOwner partitionOwner : partitionOwners) {
       InetSocketAddress address =
           partitionIndexAddressMap.get(
               partitionOwner.getPartitionId());
@@ -165,23 +130,19 @@ public class NettyWorkerClient<I extends
             partitionOwner.getWorkerInfo().getTaskId());
       }
     }
+
     addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
         null);
     nettyClient.connectAllAddresses(addressTaskIdMap);
   }
 
-  /**
-   * Fill the socket address cache for the worker info and its partition.
-   *
-   * @param workerInfo Worker information to get the socket address
-   * @param partitionId Partition id to look up.
-   * @return address of the vertex range server containing this vertex
-   */
-  private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+  @Override
+  public InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
       int partitionId) {
     InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
     if (address == null) {
-      address = resolveAddress(workerInfo.getInetSocketAddress());
+      address = resolveAddress(maxResolveAddressAttempts,
+          workerInfo.getInetSocketAddress());
       if (partitionId != NO_PARTITION_ID) {
         // Only cache valid partition ids
         partitionIndexAddressMap.put(partitionId, address);
@@ -193,12 +154,16 @@ public class NettyWorkerClient<I extends
 
   /**
    * Utility method for getInetSocketAddress()
+   *
+   * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
+   *        address
    * @param address the address we are attempting to resolve
    * @return the successfully resolved address.
    * @throws IllegalStateException if the address is not resolved
    *         in <code>maxResolveAddressAttempts</code> tries.
    */
-  private InetSocketAddress resolveAddress(InetSocketAddress address) {
+  private static InetSocketAddress resolveAddress(
+      int maxResolveAddressAttempts, InetSocketAddress address) {
     int resolveAttempts = 0;
     while (address.isUnresolved() &&
       resolveAttempts < maxResolveAddressAttempts) {
@@ -211,6 +176,8 @@ public class NettyWorkerClient<I extends
       } catch (InterruptedException e) {
         LOG.warn("resolveAddress: Interrupted.", e);
       }
+      address = new InetSocketAddress(address.getHostName(),
+          address.getPort());
     }
     if (resolveAttempts >= maxResolveAddressAttempts) {
       throw new IllegalStateException("resolveAddress: Couldn't " +
@@ -219,254 +186,24 @@ public class NettyWorkerClient<I extends
     return address;
   }
 
-  /**
-   * When doing the request, short circuit if it is local
-   *
-   * @param workerInfo Worker info
-   * @param remoteServerAddress Remote server address
-   * @param writableRequest Request to either submit or run locally
-   */
-  private void doRequest(WorkerInfo workerInfo,
-                         InetSocketAddress remoteServerAddress,
-                         WritableRequest writableRequest) {
-    // If this is local, execute locally
-    if (service.getWorkerInfo().getTaskId() ==
-        workerInfo.getTaskId()) {
-      ((WorkerRequest) writableRequest).doRequest(serverData);
-    } else {
-      nettyClient.sendWritableRequest(
-          workerInfo.getTaskId(), remoteServerAddress, writableRequest);
-    }
-  }
-
-  /**
-   * Lookup PartitionOwner for a vertex.
-   * @param vertexId id to look up.
-   * @return PartitionOwner holding the vertex.
-   */
+  @Override
   public PartitionOwner getVertexPartitionOwner(I vertexId) {
     return service.getVertexPartitionOwner(vertexId);
   }
 
   @Override
-  public void sendMessageRequest(I destVertexId, M message) {
-    PartitionOwner owner = getVertexPartitionOwner(destVertexId);
-    WorkerInfo workerInfo = owner.getWorkerInfo();
-    final int partitionId = owner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
-          ") to " + destVertexId + " on worker " + workerInfo);
-    }
-    ++totalMsgsSentInSuperstep;
-
-    // Add the message to the cache
-    int workerMessageCount = sendMessageCache
-      .addMessage(workerInfo, partitionId, destVertexId, message);
-
-    // Send a request if the cache of outgoing message to
-    // the remote worker 'workerInfo' is full enough to be flushed
-    if (workerMessageCount >= maxMessagesPerWorker) {
-      Map<Integer, Map<I, Collection<M>>> workerMessages =
-        sendMessageCache.removeWorkerMessages(workerInfo);
-      InetSocketAddress remoteWorkerAddress =
-        getInetSocketAddress(workerInfo, partitionId);
-      WritableRequest writableRequest =
-        new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
-      doRequest(workerInfo, remoteWorkerAddress, writableRequest);
-    }
-  }
-
-  @Override
-  public void sendPartitionRequest(WorkerInfo workerInfo,
-                                   Partition<I, V, E, M> partition) {
-    final int partitionId = partition.getId();
-    InetSocketAddress remoteServerAddress =
-        getInetSocketAddress(workerInfo, partitionId);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("sendPartitionRequest: Sending to " +
-          remoteServerAddress +
-          " from " + workerInfo + ", with partition " + partition);
-    }
-
-    WritableRequest vertexRequest =
-        new SendVertexRequest<I, V, E, M>(partitionId,
-            partition.getVertices());
-    doRequest(workerInfo, remoteServerAddress, vertexRequest);
-
-    // Messages are stored separately
-    MessageStoreByPartition<I, M> messageStore =
-        service.getServerData().getCurrentMessageStore();
-    Map<I, Collection<M>> map = Maps.newHashMap();
-    int messagesInMap = 0;
-    for (I vertexId :
-        messageStore.getPartitionDestinationVertices(partitionId)) {
-      try {
-        Collection<M> messages = messageStore.getVertexMessages(vertexId);
-        map.put(vertexId, messages);
-        messagesInMap += messages.size();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "sendPartitionReq: Got IOException ", e);
-      }
-      if (messagesInMap > maxMessagesPerWorker) {
-        WritableRequest messagesRequest = new
-            SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-        doRequest(workerInfo, remoteServerAddress, messagesRequest);
-        map.clear();
-        messagesInMap = 0;
-      }
-    }
-    if (!map.isEmpty()) {
-      WritableRequest messagesRequest = new
-          SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-      doRequest(workerInfo, remoteServerAddress, messagesRequest);
-    }
-  }
-
-    /**
-    * Send a mutations request if the maximum number of mutations per partition
-    * was met.
-    *
-    * @param partitionId Partition id
-    * @param partitionOwner Owner of the partition
-    * @param partitionMutationCount Number of mutations for this partition
-    */
-  private void sendMutationsRequestIfFull(
-      int partitionId, PartitionOwner partitionOwner,
-      int partitionMutationCount) {
-    // Send a request if enough mutations are there for a partition
-    if (partitionMutationCount >= maxMutationsPerPartition) {
-      InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
-      Map<I, VertexMutations<I, V, E, M>> partitionMutations =
-          sendMutationsCache.removePartitionMutations(partitionId);
-      WritableRequest writableRequest =
-          new SendPartitionMutationsRequest<I, V, E, M>(
-              partitionId, partitionMutations);
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
-    }
-  }
-
-  @Override
-  public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
-      IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
-    int partitionId = partitionOwner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
-          vertexIndex + " with partition " + partitionId);
-    }
-
-    // Add the message to the cache
-    int partitionMutationCount =
-        sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
-
-    sendMutationsRequestIfFull(
-        partitionId, partitionOwner, partitionMutationCount);
-  }
-
-  @Override
-  public void removeEdgeRequest(I vertexIndex,
-                                I destinationVertexIndex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
-    int partitionId = partitionOwner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("removeEdgeRequest: Removing edge " +
-          destinationVertexIndex +
-          " for index " + vertexIndex + " with partition " + partitionId);
-    }
-
-    // Add the message to the cache
-    int partitionMutationCount =
-        sendMutationsCache.removeEdgeMutation(
-            partitionId, vertexIndex, destinationVertexIndex);
-
-    sendMutationsRequestIfFull(
-        partitionId, partitionOwner, partitionMutationCount);
-  }
-
-  @Override
-  public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
-    int partitionId = partitionOwner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("addVertexRequest: Sending vertex " + vertex +
-          " to partition " + partitionId);
-    }
-
-    // Add the message to the cache
-    int partitionMutationCount =
-        sendMutationsCache.addVertexMutation(partitionId, vertex);
-
-    sendMutationsRequestIfFull(
-        partitionId, partitionOwner, partitionMutationCount);
-  }
-
-  @Override
-  public void removeVertexRequest(I vertexIndex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
-    int partitionId = partitionOwner.getPartitionId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("removeVertexRequest: Removing vertex index " +
-          vertexIndex + " from partition " + partitionId);
-    }
-
-    // Add the message to the cache
-    int partitionMutationCount =
-        sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
-
-    sendMutationsRequestIfFull(
-        partitionId, partitionOwner, partitionMutationCount);
+  public void sendWritableRequest(Integer destWorkerId,
+                                  InetSocketAddress remoteServer,
+                                  WritableRequest request) {
+    nettyClient.sendWritableRequest(destWorkerId, remoteServer, request);
   }
 
   @Override
-  public void flush() throws IOException {
-    // Execute the remaining sends messages (if any)
-    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> remainingMessageCache =
-        sendMessageCache.removeAllMessages();
-    for (Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
-      remainingMessageCache.entrySet()) {
-      Iterator<Integer> cachedPartitionId =
-        entry.getValue().keySet().iterator();
-      final int partitionId = cachedPartitionId.hasNext() ?
-        cachedPartitionId.next() : NO_PARTITION_ID;
-      InetSocketAddress remoteWorkerAddress =
-        getInetSocketAddress(entry.getKey(), partitionId);
-      WritableRequest writableRequest =
-        new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
-      doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
-    }
-
-    // Execute the remaining sends mutations (if any)
-    Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
-        sendMutationsCache.removeAllPartitionMutations();
-    for (Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
-        remainingMutationsCache.entrySet()) {
-      WritableRequest writableRequest =
-          new SendPartitionMutationsRequest<I, V, E, M>(
-              entry.getKey(), entry.getValue());
-      PartitionOwner partitionOwner =
-          getVertexPartitionOwner(
-              entry.getValue().keySet().iterator().next());
-      InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(),
-              partitionOwner.getPartitionId());
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
-    }
-
+  public void waitAllRequests() {
     nettyClient.waitAllRequests();
   }
 
   @Override
-  public long resetMessageCount() {
-    long messagesSentInSuperstep = totalMsgsSentInSuperstep;
-    totalMsgsSentInSuperstep = 0;
-    return messagesSentInSuperstep;
-  }
-
-  @Override
   public void closeConnections() throws IOException {
     nettyClient.stop();
   }
@@ -474,12 +211,12 @@ public class NettyWorkerClient<I extends
 /*if[HADOOP_NON_SECURE]
   @Override
   public void setup() {
-    fixPartitionIdToSocketAddrMap();
+    openConnections(service.getPartitionOwners());
   }
 else[HADOOP_NON_SECURE]*/
   @Override
   public void setup(boolean authenticate) {
-    fixPartitionIdToSocketAddrMap();
+    openConnections(service.getPartitionOwners());
     if (authenticate) {
       authenticate();
     }

Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (from r1399043, giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java&r1=1399043&r2=1399090&rev=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Oct 17 04:33:16 2012
@@ -15,21 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.comm.netty;
 
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.SendMessageCache;
 import org.apache.giraph.comm.SendMutationsCache;
+import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.Edge;
@@ -43,214 +50,76 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
- * Takes users facing APIs in {@link WorkerClient} and implements them
- * using the available {@link WritableRequest} objects.
+ * Aggregate requests and sends them to the thread-safe NettyClient.  This
+ * class is not thread-safe and expected to be used and then thrown away after
+ * a phase of communication has completed.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
  * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public class NettyWorkerClient<I extends WritableComparable,
+public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
-    WorkerClient<I, V, E, M> {
+    WorkerClientRequestProcessor<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG =
-    Logger.getLogger(NettyWorkerClient.class);
-  /** Signal for getInetSocketAddress() to use WorkerInfo's address */
-  private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
-  /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Netty client that does that actual I/O */
-  private final NettyClient nettyClient;
-  /** Centralized service, needed to get vertex ranges */
-  private final CentralizedServiceWorker<I, V, E, M> service;
-  /**
-   * Cached map of partition ids to remote socket address.
-   */
-  private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
-      new ConcurrentHashMap<Integer, InetSocketAddress>();
-  /**
-   * Cached map of partitions to vertex indices to messages
-   */
+      Logger.getLogger(NettyWorkerClientRequestProcessor.class);
+  /** Cached partitions of vertices to send */
+  private final SendPartitionCache<I, V, E, M> sendPartitionCache;
+  /** Cached map of partitions to vertex indices to messages */
   private final SendMessageCache<I, M> sendMessageCache;
-  /**
-   * Cached map of partitions to vertex indices to mutations
-   */
-  private final SendMutationsCache<I, V, E, M> sendMutationsCache;
+  /** Cached map of partitions to vertex indices to mutations */
+  private final SendMutationsCache<I, V, E, M> sendMutationsCache =
+      new SendMutationsCache<I, V, E, M>();
+  /** NettyClient that could be shared among one or more instances */
+  private final WorkerClient<I, V, E, M> workerClient;
+  /** Messages sent during the last superstep */
+  private long totalMsgsSentInSuperstep = 0;
   /** Maximum number of messages per remote worker to cache before sending */
   private final int maxMessagesPerWorker;
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
-  /** Maximum number of attempts to resolve an address*/
-  private final int maxResolveAddressAttempts;
-  /** Messages sent during the last superstep */
-  private long totalMsgsSentInSuperstep = 0;
-  /** Server data from the server */
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Server data from the server (used for local requests) */
   private final ServerData<I, V, E, M> serverData;
 
   /**
-   * Only constructor.
+   * Constructor.
    *
-   * @param context Context from mapper
+   * @param context Context
    * @param configuration Configuration
-   * @param service Used to get partition mapping
-   * @param serverData Server data (used for local requests)
+   * @param serviceWorker Service worker
    */
-  public NettyWorkerClient(
+  public NettyWorkerClientRequestProcessor(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      CentralizedServiceWorker<I, V, E, M> service,
-      ServerData<I, V, E, M> serverData) {
-    this.nettyClient = new NettyClient(context, configuration);
-    this.conf = configuration;
-    this.service = service;
-    maxMessagesPerWorker = conf.getInt(
+      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+    this.workerClient = serviceWorker.getWorkerClient();
+    this.configuration = configuration;
+
+    sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
+        configuration);
+    sendMessageCache =
+        new SendMessageCache<I, M>(configuration);
+    maxMessagesPerWorker = configuration.getInt(
         GiraphConfiguration.MSG_SIZE,
         GiraphConfiguration.MSG_SIZE_DEFAULT);
-    maxMutationsPerPartition = conf.getInt(
+    maxMutationsPerPartition = configuration.getInt(
         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
-    maxResolveAddressAttempts = conf.getInt(
-        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
-        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
-    sendMessageCache = new SendMessageCache<I, M>(conf);
-    sendMutationsCache = new SendMutationsCache<I, V, E, M>();
-    this.serverData = serverData;
-  }
-
-  @Override
-  public void fixPartitionIdToSocketAddrMap() {
-    // 1. Fix all the cached inet addresses (remove all changed entries)
-    // 2. Connect to any new IPC servers
-    Map<InetSocketAddress, Integer> addressTaskIdMap =
-        Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
-    for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
-      InetSocketAddress address =
-          partitionIndexAddressMap.get(
-              partitionOwner.getPartitionId());
-      if (address != null &&
-          (!address.getHostName().equals(
-              partitionOwner.getWorkerInfo().getHostname()) ||
-              address.getPort() !=
-              partitionOwner.getWorkerInfo().getPort())) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("fixPartitionIdToSocketAddrMap: " +
-              "Partition owner " +
-              partitionOwner + " changed from " +
-              address);
-        }
-        partitionIndexAddressMap.remove(
-            partitionOwner.getPartitionId());
-      }
-
-      // No need to connect to myself
-      if (service.getWorkerInfo().getTaskId() !=
-          partitionOwner.getWorkerInfo().getTaskId()) {
-        addressTaskIdMap.put(
-            getInetSocketAddress(partitionOwner.getWorkerInfo(),
-                partitionOwner.getPartitionId()),
-            partitionOwner.getWorkerInfo().getTaskId());
-      }
-    }
-    addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
-        null);
-    nettyClient.connectAllAddresses(addressTaskIdMap);
-  }
-
-  /**
-   * Fill the socket address cache for the worker info and its partition.
-   *
-   * @param workerInfo Worker information to get the socket address
-   * @param partitionId Partition id to look up.
-   * @return address of the vertex range server containing this vertex
-   */
-  private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
-      int partitionId) {
-    InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
-    if (address == null) {
-      address = resolveAddress(workerInfo.getInetSocketAddress());
-      if (partitionId != NO_PARTITION_ID) {
-        // Only cache valid partition ids
-        partitionIndexAddressMap.put(partitionId, address);
-      }
-    }
-
-    return address;
-  }
-
-  /**
-   * Utility method for getInetSocketAddress()
-   * @param address the address we are attempting to resolve
-   * @return the successfully resolved address.
-   * @throws IllegalStateException if the address is not resolved
-   *         in <code>maxResolveAddressAttempts</code> tries.
-   */
-  private InetSocketAddress resolveAddress(InetSocketAddress address) {
-    int resolveAttempts = 0;
-    while (address.isUnresolved() &&
-      resolveAttempts < maxResolveAddressAttempts) {
-      ++resolveAttempts;
-      LOG.warn("resolveAddress: Failed to resolve " + address +
-        " on attempt " + resolveAttempts + " of " +
-        maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        LOG.warn("resolveAddress: Interrupted.", e);
-      }
-    }
-    if (resolveAttempts >= maxResolveAddressAttempts) {
-      throw new IllegalStateException("resolveAddress: Couldn't " +
-        "resolve " + address + " in " +  resolveAttempts + " tries.");
-    }
-    return address;
-  }
-
-  /**
-   * When doing the request, short circuit if it is local
-   *
-   * @param workerInfo Worker info
-   * @param remoteServerAddress Remote server address
-   * @param writableRequest Request to either submit or run locally
-   */
-  private void doRequest(WorkerInfo workerInfo,
-                         InetSocketAddress remoteServerAddress,
-                         WritableRequest writableRequest) {
-    // If this is local, execute locally
-    if (service.getWorkerInfo().getTaskId() ==
-        workerInfo.getTaskId()) {
-      ((WorkerRequest) writableRequest).doRequest(serverData);
-    } else {
-      nettyClient.sendWritableRequest(
-          workerInfo.getTaskId(), remoteServerAddress, writableRequest);
-    }
-  }
-
-  /**
-   * Lookup PartitionOwner for a vertex.
-   * @param vertexId id to look up.
-   * @return PartitionOwner holding the vertex.
-   */
-  public PartitionOwner getVertexPartitionOwner(I vertexId) {
-    return service.getVertexPartitionOwner(vertexId);
+    this.serviceWorker = serviceWorker;
+    this.serverData = serviceWorker.getServerData();
   }
 
   @Override
   public void sendMessageRequest(I destVertexId, M message) {
-    PartitionOwner owner = getVertexPartitionOwner(destVertexId);
+    PartitionOwner owner =
+        serviceWorker.getVertexPartitionOwner(destVertexId);
     WorkerInfo workerInfo = owner.getWorkerInfo();
     final int partitionId = owner.getPartitionId();
     if (LOG.isTraceEnabled()) {
@@ -260,18 +129,18 @@ public class NettyWorkerClient<I extends
     ++totalMsgsSentInSuperstep;
 
     // Add the message to the cache
-    int workerMessageCount = sendMessageCache
-      .addMessage(workerInfo, partitionId, destVertexId, message);
+    int workerMessageCount = sendMessageCache.addMessage(
+        workerInfo, partitionId, destVertexId, message);
 
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageCount >= maxMessagesPerWorker) {
       Map<Integer, Map<I, Collection<M>>> workerMessages =
-        sendMessageCache.removeWorkerMessages(workerInfo);
+          sendMessageCache.removeWorkerMessages(workerInfo);
       InetSocketAddress remoteWorkerAddress =
-        getInetSocketAddress(workerInfo, partitionId);
+          workerClient.getInetSocketAddress(workerInfo, partitionId);
       WritableRequest writableRequest =
-        new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+          new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
       doRequest(workerInfo, remoteWorkerAddress, writableRequest);
     }
   }
@@ -281,9 +150,9 @@ public class NettyWorkerClient<I extends
                                    Partition<I, V, E, M> partition) {
     final int partitionId = partition.getId();
     InetSocketAddress remoteServerAddress =
-        getInetSocketAddress(workerInfo, partitionId);
+        workerClient.getInetSocketAddress(workerInfo, partitionId);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("sendPartitionRequest: Sending to " +
+      LOG.trace("sendVertexRequest: Sending to " +
           remoteServerAddress +
           " from " + workerInfo + ", with partition " + partition);
     }
@@ -295,7 +164,7 @@ public class NettyWorkerClient<I extends
 
     // Messages are stored separately
     MessageStoreByPartition<I, M> messageStore =
-        service.getServerData().getCurrentMessageStore();
+        serverData.getCurrentMessageStore();
     Map<I, Collection<M>> map = Maps.newHashMap();
     int messagesInMap = 0;
     for (I vertexId :
@@ -306,7 +175,7 @@ public class NettyWorkerClient<I extends
         messagesInMap += messages.size();
       } catch (IOException e) {
         throw new IllegalStateException(
-            "sendPartitionReq: Got IOException ", e);
+            "sendVertexRequest: Got IOException ", e);
       }
       if (messagesInMap > maxMessagesPerWorker) {
         WritableRequest messagesRequest = new
@@ -323,35 +192,23 @@ public class NettyWorkerClient<I extends
     }
   }
 
-    /**
-    * Send a mutations request if the maximum number of mutations per partition
-    * was met.
-    *
-    * @param partitionId Partition id
-    * @param partitionOwner Owner of the partition
-    * @param partitionMutationCount Number of mutations for this partition
-    */
-  private void sendMutationsRequestIfFull(
-      int partitionId, PartitionOwner partitionOwner,
-      int partitionMutationCount) {
-    // Send a request if enough mutations are there for a partition
-    if (partitionMutationCount >= maxMutationsPerPartition) {
-      InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
-      Map<I, VertexMutations<I, V, E, M>> partitionMutations =
-          sendMutationsCache.removePartitionMutations(partitionId);
-      WritableRequest writableRequest =
-          new SendPartitionMutationsRequest<I, V, E, M>(
-              partitionId, partitionMutations);
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
+  @Override
+  public void sendVertexRequest(PartitionOwner partitionOwner,
+                                Vertex<I, V, E, M> vertex) {
+    Partition<I, V, E, M> partition =
+        sendPartitionCache.addVertex(partitionOwner, vertex);
+    if (partition == null) {
+      return;
     }
+
+    sendPartitionRequest(partitionOwner.getWorkerInfo(), partition);
   }
 
   @Override
   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
       IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
@@ -366,10 +223,37 @@ public class NettyWorkerClient<I extends
         partitionId, partitionOwner, partitionMutationCount);
   }
 
+  /**
+   * Send a mutations request if the maximum number of mutations per partition
+   * was met.
+   *
+   * @param partitionId Partition id
+   * @param partitionOwner Owner of the partition
+   * @param partitionMutationCount Number of mutations for this partition
+   */
+  private void sendMutationsRequestIfFull(
+      int partitionId, PartitionOwner partitionOwner,
+      int partitionMutationCount) {
+    // Send a request if enough mutations are there for a partition
+    if (partitionMutationCount >= maxMutationsPerPartition) {
+      InetSocketAddress remoteServerAddress =
+          workerClient.getInetSocketAddress(
+              partitionOwner.getWorkerInfo(), partitionId);
+      Map<I, VertexMutations<I, V, E, M>> partitionMutations =
+          sendMutationsCache.removePartitionMutations(partitionId);
+      WritableRequest writableRequest =
+          new SendPartitionMutationsRequest<I, V, E, M>(
+              partitionId, partitionMutations);
+      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
+          writableRequest);
+    }
+  }
+
   @Override
   public void removeEdgeRequest(I vertexIndex,
                                 I destinationVertexIndex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("removeEdgeRequest: Removing edge " +
@@ -388,7 +272,8 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertex.getId());
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("addVertexRequest: Sending vertex " + vertex +
@@ -405,7 +290,8 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void removeVertexRequest(I vertexIndex) throws IOException {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("removeVertexRequest: Removing vertex index " +
@@ -422,41 +308,46 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void flush() throws IOException {
+    // Execute the remaining send partitions (if any)
+    for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+        sendPartitionCache.getOwnerPartitionMap().entrySet()) {
+      sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
+    }
+    sendPartitionCache.clear();
+
     // Execute the remaining sends messages (if any)
-    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> remainingMessageCache =
-        sendMessageCache.removeAllMessages();
-    for (Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
-      remainingMessageCache.entrySet()) {
+    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
+        remainingMessageCache = sendMessageCache.removeAllMessages();
+    for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
+        remainingMessageCache.entrySet()) {
       Iterator<Integer> cachedPartitionId =
-        entry.getValue().keySet().iterator();
+          entry.getValue().keySet().iterator();
       final int partitionId = cachedPartitionId.hasNext() ?
-        cachedPartitionId.next() : NO_PARTITION_ID;
+          cachedPartitionId.next() : NettyWorkerClient.NO_PARTITION_ID;
       InetSocketAddress remoteWorkerAddress =
-        getInetSocketAddress(entry.getKey(), partitionId);
+          workerClient.getInetSocketAddress(entry.getKey(), partitionId);
       WritableRequest writableRequest =
-        new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
+          new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
       doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)
     Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
         sendMutationsCache.removeAllPartitionMutations();
-    for (Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
+    for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
         remainingMutationsCache.entrySet()) {
       WritableRequest writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               entry.getKey(), entry.getValue());
       PartitionOwner partitionOwner =
-          getVertexPartitionOwner(
+          serviceWorker.getVertexPartitionOwner(
               entry.getValue().keySet().iterator().next());
       InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(),
+          workerClient.getInetSocketAddress(partitionOwner.getWorkerInfo(),
               partitionOwner.getPartitionId());
       doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
           writableRequest);
     }
-
-    nettyClient.waitAllRequests();
   }
 
   @Override
@@ -467,30 +358,27 @@ public class NettyWorkerClient<I extends
   }
 
   @Override
-  public void closeConnections() throws IOException {
-    nettyClient.stop();
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerClient.getVertexPartitionOwner(vertexId);
   }
 
-/*if[HADOOP_NON_SECURE]
-  @Override
-  public void setup() {
-    fixPartitionIdToSocketAddrMap();
-  }
-else[HADOOP_NON_SECURE]*/
-  @Override
-  public void setup(boolean authenticate) {
-    fixPartitionIdToSocketAddrMap();
-    if (authenticate) {
-      authenticate();
+  /**
+   * When doing the request, short circuit if it is local
+   *
+   * @param workerInfo Worker info
+   * @param remoteServerAddress Remote server address
+   * @param writableRequest Request to either submit or run locally
+   */
+  private void doRequest(WorkerInfo workerInfo,
+                         InetSocketAddress remoteServerAddress,
+                         WritableRequest writableRequest) {
+    // If this is local, execute locally
+    if (serviceWorker.getWorkerInfo().getTaskId() ==
+        workerInfo.getTaskId()) {
+      ((WorkerRequest) writableRequest).doRequest(serverData);
+    } else {
+      workerClient.sendWritableRequest(
+          workerInfo.getTaskId(), remoteServerAddress, writableRequest);
     }
   }
-/*end[HADOOP_NON_SECURE]*/
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-  @Override
-  public void authenticate() {
-    nettyClient.authenticate();
-  }
-/*end[HADOOP_NON_SECURE]*/
 }