You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/02/16 01:34:17 UTC

[2/2] git commit: GIRAPH-515: GIRAPH-515: More efficient and flexible edge-based input

Updated Branches:
  refs/heads/trunk ae3d29faa -> 212326153


GIRAPH-515: GIRAPH-515: More efficient and flexible edge-based input


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/21232615
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/21232615
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/21232615

Branch: refs/heads/trunk
Commit: 2123261537c3ccb041d067f5079533490db2d1ef
Parents: ae3d29f
Author: Alessandro Presta <al...@fb.com>
Authored: Fri Feb 15 16:20:43 2013 -0800
Committer: Alessandro Presta <al...@fb.com>
Committed: Fri Feb 15 16:33:00 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../ByteArrayVertexPageRankBenchmark.java          |   39 ++
 ...MultiGraphByteArrayVertexPageRankBenchmark.java |   39 ++
 .../apache/giraph/benchmark/PageRankBenchmark.java |   16 +-
 .../java/org/apache/giraph/comm/SendCache.java     |  179 +++++++++
 .../java/org/apache/giraph/comm/SendEdgeCache.java |   97 +++++
 .../org/apache/giraph/comm/SendMessageCache.java   |  138 +------
 .../java/org/apache/giraph/comm/ServerData.java    |   11 +
 .../giraph/comm/WorkerClientRequestProcessor.java  |   13 +
 .../java/org/apache/giraph/comm/WorkerServer.java  |    7 -
 .../messages/ByteArrayMessagesPerVertexStore.java  |   16 +-
 .../netty/NettyWorkerClientRequestProcessor.java   |   71 ++++-
 .../giraph/comm/netty/NettyWorkerServer.java       |   11 +-
 .../apache/giraph/comm/requests/RequestType.java   |    2 +
 .../comm/requests/SendWorkerDataRequest.java       |  111 ++++++
 .../comm/requests/SendWorkerEdgesRequest.java      |   76 ++++
 .../comm/requests/SendWorkerMessagesRequest.java   |   73 +---
 .../apache/giraph/conf/GiraphConfiguration.java    |   18 +-
 .../org/apache/giraph/conf/GiraphConstants.java    |   47 +++-
 .../conf/ImmutableClassesGiraphConfiguration.java  |   46 ++-
 .../java/org/apache/giraph/graph/EdgeStore.java    |  176 +++++++++
 .../giraph/partition/ByteArrayPartition.java       |   18 +-
 .../giraph/partition/DiskBackedPartitionStore.java |   29 +-
 .../org/apache/giraph/utils/ByteArrayEdges.java    |  290 +++++++++++++++
 .../apache/giraph/utils/ByteArrayVertexIdData.java |  226 +++++++++++
 .../giraph/utils/ByteArrayVertexIdEdges.java       |   84 +++++
 .../giraph/utils/ByteArrayVertexIdMessages.java    |  240 +++----------
 .../org/apache/giraph/utils/VertexIdIterator.java  |   85 +++++
 .../org/apache/giraph/utils/WritableUtils.java     |   35 ++
 .../org/apache/giraph/vertex/ByteArrayVertex.java  |   63 ++++
 .../apache/giraph/vertex/ByteArrayVertexBase.java  |  126 +++++++
 .../org/apache/giraph/vertex/EdgeListVertex.java   |    2 +-
 .../giraph/vertex/MultiGraphByteArrayVertex.java   |   49 +++
 .../org/apache/giraph/worker/BspServiceWorker.java |    6 +-
 .../giraph/worker/EdgeInputSplitsCallable.java     |    2 +-
 .../org/apache/giraph/comm/RequestFailureTest.java |    6 +-
 .../java/org/apache/giraph/comm/RequestTest.java   |    6 +-
 .../java/org/apache/giraph/utils/MockUtils.java    |    1 +
 .../apache/giraph/vertex/TestMultiGraphVertex.java |    8 +-
 .../apache/giraph/vertex/TestMutableVertex.java    |    6 +-
 40 files changed, 2002 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 1db3492..ee31207 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-515: More efficient and flexible edge-based input (apresta)
+
   GIRAPH-516: out-of-core messages dies for ArrayIndexOutOfBoundsException when 
   running out-of-core messages in UnsafeByteArrayOutputStream (majakabiljo)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
new file mode 100644
index 0000000..7e51c26
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.vertex.ByteArrayVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.vertex.ByteArrayVertex}
+ * implementation.
+ */
+public class ByteArrayVertexPageRankBenchmark extends
+    ByteArrayVertex<LongWritable, DoubleWritable,
+            DoubleWritable, DoubleWritable> {
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws
+      IOException {
+    PageRankComputation.computePageRank(this, messages);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
new file mode 100644
index 0000000..9144641
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.vertex.MultiGraphByteArrayVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Vertex for PageRank benchmark based on {@link
+ * org.apache.giraph.vertex.MultiGraphByteArrayVertex}
+ */
+public class MultiGraphByteArrayVertexPageRankBenchmark
+    extends MultiGraphByteArrayVertex<LongWritable, DoubleWritable,
+            DoubleWritable, DoubleWritable> {
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws
+      IOException {
+    PageRankComputation.computePageRank(this, messages);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 19b08bd..8341dce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -82,13 +82,13 @@ public class PageRankBenchmark implements Tool {
         "vertexClass",
         true,
         "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
-            "2 for RepresentativeVertex, " +
-            "3 for RepresentativeVertex with unsafe, " +
+            "2 for ByteArrayVertex, " +
+            "3 for ByteArrayVertex with unsafe, " +
             "4 for HashMapVertex (using EdgeInputFormat), " +
             "5 for MultiGraphEdgeListVertex (using EdgeInputFormat), " +
-            "6 for MultiGraphRepresentativeVertex (using " +
+            "6 for MultiGraphByteArrayVertex (using " +
             "EdgeInputFormat), " +
-            "7 for MultiGraphRepresentativeVertex with unsafe (using " +
+            "7 for MultiGraphByteArrayVertex with unsafe (using " +
             "EdgeInputFormat))");
     options.addOption("N",
         "name",
@@ -176,22 +176,22 @@ public class PageRankBenchmark implements Tool {
           HashMapVertexPageRankBenchmark.class);
     } else if (vertexClassOption == 2) {
       configuration.setVertexClass(
-          RepresentativeVertexPageRankBenchmark.class);
+          ByteArrayVertexPageRankBenchmark.class);
       configuration.useUnsafeSerialization(false);
     } else if (vertexClassOption == 3) {
       configuration.setVertexClass(
-          RepresentativeVertexPageRankBenchmark.class);
+          ByteArrayVertexPageRankBenchmark.class);
       configuration.useUnsafeSerialization(true);
     } else if (vertexClassOption == 5) {
       configuration.setVertexClass(
           MultiGraphEdgeListVertexPageRankBenchmark.class);
     } else if (vertexClassOption == 6) {
       configuration.setVertexClass(
-          MultiGraphRepresentativeVertexPageRankBenchmark.class);
+          MultiGraphByteArrayVertexPageRankBenchmark.class);
       configuration.useUnsafeSerialization(false);
     } else if (vertexClassOption == 7) {
       configuration.setVertexClass(
-          MultiGraphRepresentativeVertexPageRankBenchmark.class);
+          MultiGraphByteArrayVertexPageRankBenchmark.class);
       configuration.useUnsafeSerialization(true);
     }
     LOG.info("Using vertex class " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
new file mode 100644
index 0000000..1e8bdf9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract structure for caching data indexed by vertex id,
+ * to be sent to workers in bulk. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ */
+@SuppressWarnings("unchecked")
+public abstract class SendCache<I extends WritableComparable, T,
+    B extends ByteArrayVertexIdData<I, T>> {
+  /** Internal cache */
+  private final ByteArrayVertexIdData<I, T>[] dataCache;
+  /** Size of data (in bytes) for each worker */
+  private final int[] dataSizes;
+  /** How big to initially make output streams for each worker's partitions */
+  private final int[] initialBufferSizes;
+  /** List of partition ids belonging to a worker */
+  private final Map<WorkerInfo, List<Integer>> workerPartitions =
+      Maps.newHashMap();
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
+   * @param maxRequestSize Maximum request size (in bytes)
+   * @param additionalRequestSize Additional request size (expressed as a
+   *                              ratio of the average request size)
+   */
+  public SendCache(ImmutableClassesGiraphConfiguration conf,
+                   CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+                   int maxRequestSize,
+                   float additionalRequestSize) {
+    this.conf = conf;
+
+    int maxPartition = 0;
+    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
+      List<Integer> workerPartitionIds =
+          workerPartitions.get(partitionOwner.getWorkerInfo());
+      if (workerPartitionIds == null) {
+        workerPartitionIds = Lists.newArrayList();
+        workerPartitions.put(partitionOwner.getWorkerInfo(),
+            workerPartitionIds);
+      }
+      workerPartitionIds.add(partitionOwner.getPartitionId());
+      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
+    }
+    dataCache = new ByteArrayVertexIdData[maxPartition + 1];
+
+    int maxWorker = 0;
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
+    }
+    dataSizes = new int[maxWorker + 1];
+
+    int initialRequestSize =
+        (int) (maxRequestSize * (1 + additionalRequestSize));
+    initialBufferSizes = new int[maxWorker + 1];
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      initialBufferSizes[workerInfo.getTaskId()] =
+          initialRequestSize / workerPartitions.get(workerInfo).size();
+    }
+  }
+
+  /**
+   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+   *
+   * @return A new instance of {@link ByteArrayVertexIdData}
+   */
+  public abstract B createByteArrayVertexIdData();
+
+  /**
+   * Add data to the cache.
+   *
+   * @param workerInfo the remote worker destination
+   * @param partitionId the remote Partition this message belongs to
+   * @param destVertexId vertex id that is ultimate destination
+   * @param data Data to send to remote worker
+   * @return Size of messages for the worker.
+   */
+  public int addData(WorkerInfo workerInfo,
+                     int partitionId, I destVertexId, T data) {
+    // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData = dataCache[partitionId];
+    int originalSize = 0;
+    if (partitionData == null) {
+      partitionData = createByteArrayVertexIdData();
+      partitionData.setConf(conf);
+      partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]);
+      dataCache[partitionId] = partitionData;
+    } else {
+      originalSize = partitionData.getSize();
+    }
+    partitionData.add(destVertexId, data);
+
+    // Update the size of cached, outgoing data per worker
+    dataSizes[workerInfo.getTaskId()] +=
+        partitionData.getSize() - originalSize;
+    return dataSizes[workerInfo.getTaskId()];
+  }
+
+  /**
+   * Gets the data for a worker and removes it from the cache.
+   *
+   * @param workerInfo the address of the worker who owns the data
+   *                   partitions that are receiving the data
+   * @return List of pairs (partitionId, ByteArrayVertexIdData),
+   *         where all partition ids belong to workerInfo
+   */
+  public PairList<Integer, B>
+  removeWorkerData(WorkerInfo workerInfo) {
+    PairList<Integer, B> workerData = new PairList<Integer, B>();
+    List<Integer> partitions = workerPartitions.get(workerInfo);
+    workerData.initialize(partitions.size());
+    for (Integer partitionId : partitions) {
+      if (dataCache[partitionId] != null) {
+        workerData.add(partitionId, (B) dataCache[partitionId]);
+        dataCache[partitionId] = null;
+      }
+    }
+    dataSizes[workerInfo.getTaskId()] = 0;
+    return workerData;
+  }
+
+  /**
+   * Gets all the data and removes it from the cache.
+   *
+   * @return All data for all vertices for all partitions
+   */
+  public PairList<WorkerInfo, PairList<Integer, B>> removeAllData() {
+    PairList<WorkerInfo, PairList<Integer, B>> allData =
+        new PairList<WorkerInfo, PairList<Integer, B>>();
+    allData.initialize(dataSizes.length);
+    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
+      PairList<Integer, B> workerData = removeWorkerData(workerInfo);
+      if (!workerData.isEmpty()) {
+        allData.add(workerInfo, workerData);
+      }
+      dataSizes[workerInfo.getTaskId()] = 0;
+    }
+    return allData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
new file mode 100644
index 0000000..f239c1a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Aggregates the edges to be sent to workers so they can be sent
+ * in bulk.  Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class SendEdgeCache<I extends WritableComparable, E extends Writable>
+    extends SendCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
+  /**
+   * Constructor
+   *
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
+   */
+  public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
+                       CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+    super(conf,
+        serviceWorker,
+        conf.getInt(GiraphConstants.MAX_EDGE_REQUEST_SIZE,
+            GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT),
+        conf.getFloat(GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE,
+            GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT));
+  }
+
+  @Override
+  public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+    return new ByteArrayVertexIdEdges<I, E>();
+  }
+
+  /**
+   * Add an edge to the cache.
+   *
+   * @param workerInfo the remote worker destination
+   * @param partitionId the remote Partition this edge belongs to
+   * @param destVertexId vertex id that is ultimate destination
+   * @param edge Edge to send to remote worker
+   * @return Size of edges for the worker.
+   */
+  public int addEdge(WorkerInfo workerInfo,
+                     int partitionId, I destVertexId, Edge<I, E> edge) {
+    return addData(workerInfo, partitionId, destVertexId, edge);
+  }
+
+  /**
+   * Gets the edges for a worker and removes it from the cache.
+   *
+   * @param workerInfo the address of the worker who owns the data
+   *                   partitions that are receiving the edges
+   * @return List of pairs (partitionId, ByteArrayVertexIdEdges),
+   *         where all partition ids belong to workerInfo
+   */
+  public PairList<Integer, ByteArrayVertexIdEdges<I, E>>
+  removeWorkerEdges(WorkerInfo workerInfo) {
+    return removeWorkerData(workerInfo);
+  }
+
+  /**
+   * Gets all the edges and removes them from the cache.
+   *
+   * @return All vertex edges for all partitions
+   */
+  public PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdEdges<I, E>>>
+  removeAllEdges() {
+    return removeAllData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 3cbf0eb..07dc380 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -21,56 +21,21 @@ package org.apache.giraph.comm;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
 /**
- * Aggregates the messages to be send to workers so they can be sent
+ * Aggregates the messages to be sent to workers so they can be sent
  * in bulk.  Not thread-safe.
  *
  * @param <I> Vertex id
  * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public class SendMessageCache<I extends WritableComparable,
-    M extends Writable> {
-  /**
-   * How much bigger than the average per partition size to make initial per
-   * partition buffers.
-   * If this value is A, message request size is M,
-   * and a worker has P partitions, than its initial partition buffer size
-   * will be (M / P) * (1 + A).
-   */
-  public static final String ADDITIONAL_MSG_REQUEST_SIZE =
-      "giraph.additionalMsgRequestSize";
-  /**
-   * Default factor for how bigger should initial per partition buffers be
-   * of 20%.
-   */
-  public static final float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
-
-  /** Internal cache */
-  private final ByteArrayVertexIdMessages<I, M>[] messageCache;
-  /** Size of messages (in bytes) for each worker */
-  private final int[] messageSizes;
-  /** How big to initially make output streams for each worker's partitions */
-  private final int[] initialBufferSizes;
-  /** List of partition ids belonging to a worker */
-  private final Map<WorkerInfo, List<Integer>> workerPartitions =
-      Maps.newHashMap();
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
-
+public class SendMessageCache<I extends WritableComparable, M extends Writable>
+    extends SendCache<I, M, ByteArrayVertexIdMessages<I, M>> {
   /**
    * Constructor
    *
@@ -79,39 +44,17 @@ public class SendMessageCache<I extends WritableComparable,
    */
   public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
       CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
-    this.conf = conf;
-
-    int maxPartition = 0;
-    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
-      List<Integer> workerPartitionIds =
-          workerPartitions.get(partitionOwner.getWorkerInfo());
-      if (workerPartitionIds == null) {
-        workerPartitionIds = Lists.newArrayList();
-        workerPartitions.put(partitionOwner.getWorkerInfo(),
-            workerPartitionIds);
-      }
-      workerPartitionIds.add(partitionOwner.getPartitionId());
-      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
-    }
-    messageCache = new ByteArrayVertexIdMessages[maxPartition + 1];
-
-    int maxWorker = 0;
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
-    }
-    messageSizes = new int[maxWorker + 1];
+    super(conf,
+        serviceWorker,
+        conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
+            GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT),
+        conf.getFloat(GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE,
+            GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT));
+  }
 
-    float additionalRequestSize =
-        conf.getFloat(ADDITIONAL_MSG_REQUEST_SIZE,
-            ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT);
-    int requestSize = conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
-        GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
-    int initialRequestSize = (int) (requestSize * (1 + additionalRequestSize));
-    initialBufferSizes = new int[maxWorker + 1];
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      initialBufferSizes[workerInfo.getTaskId()] =
-          initialRequestSize / workerPartitions.get(workerInfo).size();
-    }
+  @Override
+  public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+    return new ByteArrayVertexIdMessages<I, M>();
   }
 
   /**
@@ -120,30 +63,12 @@ public class SendMessageCache<I extends WritableComparable,
    * @param workerInfo the remote worker destination
    * @param partitionId the remote Partition this message belongs to
    * @param destVertexId vertex id that is ultimate destination
-   * @param message Message to be send to remote
-   *                <b>host => partition => vertex</b>
+   * @param message Message to send to remote worker
    * @return Size of messages for the worker.
    */
   public int addMessage(WorkerInfo workerInfo,
-    final int partitionId, I destVertexId, M message) {
-    // Get the message collection
-    ByteArrayVertexIdMessages<I, M> partitionMessages =
-        messageCache[partitionId];
-    int originalSize = 0;
-    if (partitionMessages == null) {
-      partitionMessages = new ByteArrayVertexIdMessages<I, M>();
-      partitionMessages.setConf(conf);
-      partitionMessages.initialize(initialBufferSizes[workerInfo.getTaskId()]);
-      messageCache[partitionId] = partitionMessages;
-    } else {
-      originalSize = partitionMessages.getSize();
-    }
-    partitionMessages.add(destVertexId, message);
-
-    // Update the size of cached, outgoing messages per worker
-    messageSizes[workerInfo.getTaskId()] +=
-        partitionMessages.getSize() - originalSize;
-    return messageSizes[workerInfo.getTaskId()];
+                        int partitionId, I destVertexId, M message) {
+    return addData(workerInfo, partitionId, destVertexId, message);
   }
 
   /**
@@ -156,18 +81,7 @@ public class SendMessageCache<I extends WritableComparable,
    */
   public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
   removeWorkerMessages(WorkerInfo workerInfo) {
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
-        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
-    List<Integer> partitions = workerPartitions.get(workerInfo);
-    workerMessages.initialize(partitions.size());
-    for (Integer partitionId : partitions) {
-      if (messageCache[partitionId] != null) {
-        workerMessages.add(partitionId, messageCache[partitionId]);
-        messageCache[partitionId] = null;
-      }
-    }
-    messageSizes[workerInfo.getTaskId()] = 0;
-    return workerMessages;
+    return removeWorkerData(workerInfo);
   }
 
   /**
@@ -177,20 +91,6 @@ public class SendMessageCache<I extends WritableComparable,
    */
   public PairList<WorkerInfo, PairList<
       Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
-    PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessages<I, M>>>
-        allMessages = new PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
-    allMessages.initialize(messageSizes.length);
-    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
-      PairList<Integer, ByteArrayVertexIdMessages<I,
-                M>> workerMessages =
-          removeWorkerMessages(workerInfo);
-      if (!workerMessages.isEmpty()) {
-        allMessages.add(workerInfo, workerMessages);
-      }
-      messageSizes[workerInfo.getTaskId()] = 0;
-    }
-    return allMessages;
+    return removeAllData();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 3655d79..7b4baa1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,12 +18,14 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.EdgeStore;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.partition.DiskBackedPartitionStore;
 import org.apache.giraph.partition.PartitionStore;
@@ -48,6 +50,8 @@ public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
   /** Partition store for this worker. */
   private volatile PartitionStore<I, V, E, M> partitionStore;
+  /** Edge store for this worker. */
+  private final EdgeStore<I, V, E, M> edgeStore;
   /** Message store factory */
   private final
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
@@ -79,11 +83,13 @@ public class ServerData<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param service Service worker
    * @param configuration Configuration
    * @param messageStoreFactory Factory for message stores
    * @param context Mapper context
    */
   public ServerData(
+      CentralizedServiceWorker<I, V, E, M> service,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
           messageStoreFactory,
@@ -100,10 +106,15 @@ public class ServerData<I extends WritableComparable,
       partitionStore =
           new SimplePartitionStore<I, V, E, M>(configuration, context);
     }
+    edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
     ownerAggregatorData = new OwnerAggregatorServerData(context);
     allAggregatorData = new AllAggregatorServerData(context);
   }
 
+  public EdgeStore<I, V, E, M> getEdgeStore() {
+    return edgeStore;
+  }
+
   /**
    * Return the partition store for this worker.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index 0c043e2..43311f4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -74,6 +74,19 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
   void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
 
   /**
+   * Sends a request to the source vertex owner to add an edge.
+   * Note: this request follows an optimized code path used by edge-based
+   * input, and doesn't coordinate with mutations.
+   *
+   * @param sourceVertexId Source vertex id.
+   * @param edge Edge to be added.
+   * @return Returns true iff any network I/O occurred.
+   * @throws IOException
+   */
+  boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+    throws IOException;
+
+  /**
    * Sends a request to the appropriate vertex range owner to remove all edges
    * pointing to a given vertex.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index e60db55..e373b2c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -52,13 +52,6 @@ public interface WorkerServer<I extends WritableComparable,
   void prepareSuperstep(GraphState<I, V, E, M> graphState);
 
   /**
-   * Only resolve mutations requests (used for edge-oriented input).
-   *
-   * @param graphState Current graph state
-   */
-  void resolveMutations(GraphState<I, V, E, M> graphState);
-
-  /**
    * Get server data
    *
    * @return Server data

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 65caa5d..3cd1175 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -19,20 +19,22 @@
 package org.apache.giraph.comm.messages;
 
 import com.google.common.collect.Iterators;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
+import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
 /**
  * Implementation of {@link SimpleMessageStore} where multiple messages are
  * stored per vertex as byte arrays.  Used when there is no combiner provided.
@@ -65,7 +67,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    */
   private ExtendedDataOutput getExtendedDataOutput(
       ConcurrentMap<I, ExtendedDataOutput> partitionMap,
-      ByteArrayVertexIdMessages<I, M>.VertexIdIterator iterator) {
+      VertexIdIterator<I> iterator) {
     ExtendedDataOutput extendedDataOutput =
         partitionMap.get(iterator.getCurrentVertexId());
     if (extendedDataOutput == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index d4e919e..0fc1858 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,8 +17,12 @@
  */
 package org.apache.giraph.comm.netty;
 
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.SendEdgeCache;
 import org.apache.giraph.comm.SendMessageCache;
 import org.apache.giraph.comm.SendMutationsCache;
 import org.apache.giraph.comm.SendPartitionCache;
@@ -29,6 +33,7 @@ import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
@@ -41,6 +46,7 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.vertex.Vertex;
@@ -50,10 +56,6 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -67,6 +69,7 @@ import java.util.Map;
  * @param <E> Edge data
  * @param <M> Message data
  */
+@SuppressWarnings("unchecked")
 public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     WorkerClientRequestProcessor<I, V, E, M> {
@@ -77,6 +80,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   private final SendPartitionCache<I, V, E, M> sendPartitionCache;
   /** Cached map of partitions to vertex indices to messages */
   private final SendMessageCache<I, M> sendMessageCache;
+  /** Cache of edges to be sent. */
+  private final SendEdgeCache<I, E> sendEdgeCache;
   /** Cached map of partitions to vertex indices to mutations */
   private final SendMutationsCache<I, V, E, M> sendMutationsCache =
       new SendMutationsCache<I, V, E, M>();
@@ -86,6 +91,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
+  /** Maximum size of edges per remote worker to cache before sending. */
+  private final int maxEdgesSizePerWorker;
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
   /** Giraph configuration */
@@ -119,9 +126,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         configuration);
     sendMessageCache =
         new SendMessageCache<I, M>(configuration, serviceWorker);
+    sendEdgeCache = new SendEdgeCache<I, E>(configuration, serviceWorker);
     maxMessagesSizePerWorker = configuration.getInt(
         GiraphConstants.MAX_MSG_REQUEST_SIZE,
         GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
+    maxEdgesSizePerWorker = configuration.getInt(
+        GiraphConstants.MAX_EDGE_REQUEST_SIZE,
+        GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT);
     maxMutationsPerPartition = configuration.getInt(
         GiraphConstants.MAX_MUTATIONS_PER_REQUEST,
         GiraphConstants.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
@@ -135,7 +146,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
     final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
         new Gauge<Long>() {
-          @Override public Long value() {
+          @Override
+          public Long value() {
             return localRequests.count() + remoteRequests.count();
           }
         }
@@ -174,7 +186,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
           workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+          new SendWorkerMessagesRequest<I, M>(workerMessages);
       doRequest(workerInfo, writableRequest);
       return true;
     }
@@ -278,6 +290,36 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         partitionId, partitionOwner, partitionMutationCount);
   }
 
+  @Override
+  public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+    throws IOException {
+    PartitionOwner owner =
+        serviceWorker.getVertexPartitionOwner(sourceVertexId);
+    WorkerInfo workerInfo = owner.getWorkerInfo();
+    final int partitionId = owner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
+          ") to " + sourceVertexId + " on worker " + workerInfo);
+    }
+
+    // Add the message to the cache
+    int workerEdgesSize = sendEdgeCache.addEdge(
+        workerInfo, partitionId, sourceVertexId, edge);
+
+    // Send a request if the cache of outgoing edges to the remote worker is
+    // full
+    if (workerEdgesSize >= maxEdgesSizePerWorker) {
+      PairList<Integer, ByteArrayVertexIdEdges<I, E>> workerEdges =
+          sendEdgeCache.removeWorkerEdges(workerInfo);
+      WritableRequest writableRequest =
+          new SendWorkerEdgesRequest<I, E>(workerEdges);
+      doRequest(workerInfo, writableRequest);
+      return true;
+    }
+
+    return false;
+  }
+
   /**
    * Send a mutations request if the maximum number of mutations per partition
    * was met.
@@ -376,11 +418,26 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     while (iterator.hasNext()) {
       iterator.next();
       WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, V, E, M>(
+          new SendWorkerMessagesRequest<I, M>(
               iterator.getCurrentSecond());
       doRequest(iterator.getCurrentFirst(), writableRequest);
     }
 
+    // Execute the remaining sends edges (if any)
+    PairList<WorkerInfo, PairList<Integer,
+        ByteArrayVertexIdEdges<I, E>>>
+        remainingEdgeCache = sendEdgeCache.removeAllEdges();
+    PairList<WorkerInfo,
+        PairList<Integer, ByteArrayVertexIdEdges<I, E>>>.Iterator
+        edgeIterator = remainingEdgeCache.getIterator();
+    while (edgeIterator.hasNext()) {
+      edgeIterator.next();
+      WritableRequest writableRequest =
+          new SendWorkerEdgesRequest<I, E>(
+              edgeIterator.getCurrentSecond());
+      doRequest(edgeIterator.getCurrentFirst(), writableRequest);
+    }
+
     // Execute the remaining sends mutations (if any)
     Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
         sendMutationsCache.removeAllPartitionMutations();

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 1b7cc54..697b6ce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -90,7 +90,8 @@ public class NettyWorkerServer<I extends WritableComparable,
     this.service = service;
 
     serverData =
-        new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
+        new ServerData<I, V, E, M>(service, conf, createMessageStoreFactory(),
+            context);
 
     nettyServer = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
@@ -153,8 +154,12 @@ public class NettyWorkerServer<I extends WritableComparable,
     resolveMutations(graphState);
   }
 
-  @Override
-  public void resolveMutations(GraphState<I, V, E, M> graphState) {
+  /**
+   * Resolve mutation requests.
+   *
+   * @param graphState Graph state
+   */
+  private void resolveMutations(GraphState<I, V, E, M> graphState) {
     Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
         service.getPartitionStore().getNumPartitions(), 100);
       // Add any mutated vertex indices to be resolved

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index aac0028..4129fb8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -42,6 +42,8 @@ public enum RequestType {
    */
   SEND_PARTITION_CURRENT_MESSAGES_REQUEST
       (SendPartitionCurrentMessagesRequest.class),
+  /** Send a partition of edges */
+  SEND_WORKER_EDGES_REQUEST(SendWorkerEdgesRequest.class),
   /** Send a partition of mutations */
   SEND_PARTITION_MUTATIONS_REQUEST(SendPartitionMutationsRequest.class),
   /** Send aggregated values from one worker's vertices */

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
new file mode 100644
index 0000000..4f80224
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Abstract request to send a collection of data, indexed by vertex id,
+ * for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ */
+public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
+    B extends ByteArrayVertexIdData<I, T>>
+    extends WritableRequest implements WorkerRequest {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendWorkerDataRequest.class);
+  /**
+   * All data for a group of vertices, organized by partition, which
+   * are owned by a single (destination) worker. This data is all
+   * destined for this worker.
+   * */
+  protected PairList<Integer, B> partitionVertexData;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerDataRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param partVertData Map of remote partitions =>
+   *                     ByteArrayVertexIdData
+   */
+  public SendWorkerDataRequest(
+      PairList<Integer, B> partVertData) {
+    this.partitionVertexData = partVertData;
+  }
+
+  /**
+   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+   *
+   * @return A new instance of {@link ByteArrayVertexIdData}
+   */
+  public abstract B createByteArrayVertexIdData();
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    int numPartitions = input.readInt();
+    partitionVertexData = new PairList<Integer, B>();
+    partitionVertexData.initialize(numPartitions);
+    while (numPartitions-- > 0) {
+      final int partitionId = input.readInt();
+      B vertexIdData = createByteArrayVertexIdData();
+      vertexIdData.setConf(getConf());
+      vertexIdData.readFields(input);
+      partitionVertexData.add(partitionId, vertexIdData);
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(partitionVertexData.getSize());
+    PairList<Integer, B>.Iterator
+        iterator = partitionVertexData.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      output.writeInt(iterator.getCurrentFirst());
+      iterator.getCurrentSecond().write(output);
+    }
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = super.getSerializedSize() + 4;
+    PairList<Integer, B>.Iterator iterator = partitionVertexData.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      size += 4 + iterator.getCurrentSecond().getSerializedSize();
+    }
+    return size;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
new file mode 100644
index 0000000..f301bbf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Send a collection of edges for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("unchecked")
+public class SendWorkerEdgesRequest<I extends WritableComparable,
+    E extends Writable>
+    extends SendWorkerDataRequest<I, Edge<I, E>,
+    ByteArrayVertexIdEdges<I, E>> {
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerEdgesRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param partVertEdges Map of remote partitions =>
+   *                     ByteArrayVertexIdEdges
+   */
+  public SendWorkerEdgesRequest(
+      PairList<Integer, ByteArrayVertexIdEdges<I, E>> partVertEdges) {
+    this.partitionVertexData = partVertEdges;
+  }
+
+  @Override
+  public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+    return new ByteArrayVertexIdEdges<I, E>();
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_EDGES_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    PairList<Integer, ByteArrayVertexIdEdges<I, E>>.Iterator
+        iterator = partitionVertexData.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      serverData.getEdgeStore().
+          addPartitionEdges(iterator.getCurrentFirst(),
+              iterator.getCurrentSecond());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 641c795..04b633b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -18,39 +18,24 @@
 
 package org.apache.giraph.comm.requests;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
+
+import java.io.IOException;
 
 /**
  * Send a collection of vertex messages for a partition.
  *
  * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
  * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("unchecked")
 public class SendWorkerMessagesRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendWorkerMessagesRequest.class);
-  /**
-   * All messages for a group of vertices, organized by partition, which
-   * are owned by a single (destination) worker. These messages are all
-   * destined for this worker.
-   * */
-  private PairList<Integer, ByteArrayVertexIdMessages<I, M>>
-  partitionVertexMessages;
-
+    M extends Writable>
+    extends SendWorkerDataRequest<I, M, ByteArrayVertexIdMessages<I, M>> {
   /**
    * Constructor used for reflection only
    */
@@ -63,37 +48,13 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
    *                     ByteArrayVertexIdMessages
    */
   public SendWorkerMessagesRequest(
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
-    super();
-    this.partitionVertexMessages = partVertMsgs;
+      PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
+    this.partitionVertexData = partVertMsgs;
   }
 
   @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    int numPartitions = input.readInt();
-    partitionVertexMessages =
-        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
-    partitionVertexMessages.initialize(numPartitions);
-    while (numPartitions-- > 0) {
-      final int partitionId = input.readInt();
-      ByteArrayVertexIdMessages<I, M> vertexIdMessages =
-          new ByteArrayVertexIdMessages<I, M>();
-      vertexIdMessages.setConf(getConf());
-      vertexIdMessages.readFields(input);
-      partitionVertexMessages.add(partitionId, vertexIdMessages);
-    }
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionVertexMessages.getSize());
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      output.writeInt(iterator.getCurrentFirst());
-      iterator.getCurrentSecond().write(output);
-    }
+  public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+    return new ByteArrayVertexIdMessages<I, M>();
   }
 
   @Override
@@ -102,9 +63,9 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
   }
 
   @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
+  public void doRequest(ServerData serverData) {
     PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
+        iterator = partitionVertexData.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
       try {
@@ -116,16 +77,4 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
       }
     }
   }
-
-  @Override
-  public int getSerializedSize() {
-    int size = super.getSerializedSize() + 4;
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      size += 4 + iterator.getCurrentSecond().getSerializedSize();
-    }
-    return size;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 9e129ef..96fada4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -18,16 +18,14 @@
 
 package org.apache.giraph.conf;
 
-import java.net.UnknownHostException;
-
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.job.DefaultJobObserver;
-import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
@@ -39,6 +37,8 @@ import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.DNS;
 
+import java.net.UnknownHostException;
+
 /**
  * Adds user methods specific to Giraph.  This will be put into an
  * ImmutableClassesGiraphConfiguration that provides the configuration plus
@@ -701,6 +701,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Check if we can reuse incoming edge objects.
+   *
+   * @return True iff we can reuse incoming edge objects.
+   */
+  public boolean reuseIncomingEdgeObjects() {
+    return getBoolean(GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS,
+        GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS_DEFAULT);
+  }
+
+  /**
    * Get the local hostname on the given interface.
    *
    * @return The local hostname

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 44d09c9..20d6df7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -337,10 +337,36 @@ public interface GiraphConstants {
   /** Default maximum size of messages per peer before flush of 0.5MB */
   int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
 
-  /** Maximum number of messages per peer before flush */
-  String MSG_SIZE = "giraph.msgSize";
-  /** Default maximum number of messages per peer before flush */
-  int MSG_SIZE_DEFAULT = 2000;
+  /**
+   * How much bigger than the average per partition size to make initial per
+   * partition buffers.
+   * If this value is A, message request size is M,
+   * and a worker has P partitions, than its initial partition buffer size
+   * will be (M / P) * (1 + A).
+   */
+  String ADDITIONAL_MSG_REQUEST_SIZE =
+      "giraph.additionalMsgRequestSize";
+  /**
+   * Default factor for how bigger should initial per partition buffers be
+   * of 20%.
+   */
+  float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+
+  /** Maximum size of edges (in bytes) per peer before flush */
+  String MAX_EDGE_REQUEST_SIZE = "giraph.edgeRequestSize";
+  /** Default maximum size of edges per peer before flush of 0.5MB */
+  int MAX_EDGE_REQUEST_SIZE_DEFAULT = 512 * 1024;
+
+  /**
+   * Additional size (expressed as a ratio) of each per-partition buffer on
+   * top of the average size.
+   */
+  String ADDITIONAL_EDGE_REQUEST_SIZE =
+      "giraph.additionalEdgeRequestSize";
+  /**
+   * Default additional per-partition buffer size.
+   */
+  float ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT = 0.2f;
 
   /** Maximum number of mutations per partition before flush */
   String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
@@ -348,6 +374,19 @@ public interface GiraphConstants {
   int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
 
   /**
+   * Whether we should reuse the same Edge object when adding edges from
+   * requests.
+   * This works with edge storage implementations that don't keep references
+   * to the input Edge objects (e.g., ByteArrayVertex).
+   */
+  String REUSE_INCOMING_EDGE_OBJECTS = "giraph.reuseIncomingEdgeObjects";
+  /**
+   * Default is to not reuse edge objects (since it's not compatible with
+   * all storage implementations).
+   */
+  boolean REUSE_INCOMING_EDGE_OBJECTS_DEFAULT = false;
+
+  /**
    * Use message size encoding (typically better for complex objects,
    * not meant for primitive wrapped messages)
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 3e158af..18fd9ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,21 +20,23 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.job.GiraphJobObserver;
-import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeNoValue;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.MutableEdge;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.MasterGraphPartitioner;
 import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataInput;
@@ -42,6 +44,8 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -499,6 +503,32 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create a user edge.
+   *
+   * @return Instantiated user edge.
+   */
+  public Edge<I, E> createEdge() {
+    if (isEdgeValueNullWritable()) {
+      return (Edge<I, E>) new EdgeNoValue<I>(createVertexId());
+    } else {
+      return new DefaultEdge<I, E>(createVertexId(), createEdgeValue());
+    }
+  }
+
+  /**
+   * Create a mutable user edge.
+   *
+   * @return Instantiated mutable user edge.
+   */
+  public MutableEdge<I, E> createMutableEdge() {
+    if (isEdgeValueNullWritable()) {
+      return (MutableEdge<I, E>) new EdgeNoValue<I>(createVertexId());
+    } else {
+      return new DefaultEdge<I, E>(createVertexId(), createEdgeValue());
+    }
+  }
+
+  /**
    * Get the user's subclassed vertex message value class.
    *
    * @return User's vertex message value class

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
new file mode 100644
index 0000000..6210367
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Collects incoming edges for vertices owned by this worker.
+ * Note: the current implementation is simply a bridge between
+ * incoming requests and vertices. In the future, EdgeStore may become an
+ * interface allowing for alternative, pluggable implementations of edge
+ * storage without having to extend Vertex.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(EdgeStore.class);
+  /** Service worker. */
+  private CentralizedServiceWorker<I, V, E, M> service;
+  /** Giraph configuration. */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Progressable to report progress. */
+  private Progressable progressable;
+  /** Map used to temporarily store incoming edges. */
+  private ConcurrentMap<Integer,
+      ConcurrentMap<I, ByteArrayEdges<I, E>>> transientEdges;
+  /**
+   * Whether we should reuse edge objects (cached to avoid expensive calls
+   * to the configuration).
+   */
+  private boolean reuseIncomingEdgeObjects;
+
+  /**
+   * Constructor.
+   *
+   * @param service Service worker
+   * @param configuration Configuration
+   * @param progressable Progressable
+   */
+  public EdgeStore(
+      CentralizedServiceWorker<I, V, E, M> service,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      Progressable progressable) {
+    this.service = service;
+    this.configuration = configuration;
+    this.progressable = progressable;
+    reuseIncomingEdgeObjects = configuration.reuseIncomingEdgeObjects();
+    transientEdges = new MapMaker().concurrencyLevel(
+        configuration.getNettyServerExecutionConcurrency()).makeMap();
+  }
+
+  /**
+   * Add edges belonging to a given partition on this worker.
+   * Note: This method is thread-safe.
+   *
+   * @param partitionId Partition id for the incoming edges.
+   * @param edges Incoming edges
+   */
+  public void addPartitionEdges(
+      int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+    ConcurrentMap<I, ByteArrayEdges<I, E>> partitionEdges =
+        transientEdges.get(partitionId);
+    if (partitionEdges == null) {
+      ConcurrentMap<I, ByteArrayEdges<I, E>> newPartitionEdges =
+          new MapMaker().concurrencyLevel(
+              configuration.getNettyServerExecutionConcurrency()).makeMap();
+      partitionEdges = transientEdges.putIfAbsent(partitionId,
+          newPartitionEdges);
+      if (partitionEdges == null) {
+        partitionEdges = newPartitionEdges;
+      }
+    }
+    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+        edges.getVertexIdEdgeIterator();
+    while (vertexIdEdgeIterator.hasNext()) {
+      vertexIdEdgeIterator.next();
+      I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+      Edge<I, E> edge = vertexIdEdgeIterator.getCurrentEdge();
+      ByteArrayEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
+      if (vertexEdges == null) {
+        ByteArrayEdges<I, E> newVertexEdges =
+            new ByteArrayEdges<I, E>(configuration);
+        vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
+        if (vertexEdges == null) {
+          vertexEdges = newVertexEdges;
+          // Since we had to use the vertex id as a new key in the map,
+          // we need to release the object.
+          vertexIdEdgeIterator.releaseCurrentVertexId();
+        }
+      }
+      synchronized (vertexEdges) {
+        vertexEdges.appendEdge(edge);
+      }
+    }
+  }
+
+  /**
+   * Move all edges from temporary storage to their source vertices.
+   * Note: this method is not thread-safe.
+   */
+  public void moveEdgesToVertices() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+    }
+    for (Map.Entry<Integer, ConcurrentMap<I,
+        ByteArrayEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
+      Partition<I, V, E, M> partition =
+          service.getPartitionStore().getPartition(partitionEdges.getKey());
+      for (I vertexId : partitionEdges.getValue().keySet()) {
+        // Depending on whether the vertex implementation keeps references to
+        // the Edge objects or not, we may be able to reuse objects when
+        // iterating.
+        Iterable<Edge<I, E>> edgesIterable = reuseIncomingEdgeObjects ?
+            partitionEdges.getValue().remove(vertexId) :
+            partitionEdges.getValue().remove(vertexId).copyEdgeIterable();
+        Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+        // If the source vertex doesn't exist, create it. Otherwise,
+        // just set the edges.
+        if (vertex == null) {
+          vertex = configuration.createVertex();
+          vertex.initialize(vertexId, configuration.createVertexValue(),
+              edgesIterable);
+          partition.putVertex(vertex);
+        } else {
+          vertex.setEdges(edgesIterable);
+          // Some Partition implementations (e.g. ByteArrayPartition) require
+          // us to put back the vertex after modifying it.
+          partition.saveVertex(vertex);
+        }
+        progressable.progress();
+      }
+      // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
+      // require us to put back the partition after modifying it.
+      service.getPartitionStore().putPartition(partition);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+          "vertices.");
+    }
+    transientEdges.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 1298918..2260837 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -18,8 +18,14 @@
 package org.apache.giraph.partition;
 
 import com.google.common.collect.MapMaker;
-
 import com.google.common.primitives.Ints;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -27,14 +33,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.utils.UnsafeByteArrayInputStream;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
 /**
  * Byte array based partition.  Should reduce the amount of memory used since
  * the entire graph is compressed into byte arrays.  Must guarantee, however,
@@ -48,8 +46,6 @@ import org.apache.log4j.Logger;
 public class ByteArrayPartition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends BasicPartition<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
   /**
    * Vertex map for this range (keyed by index).  Note that the byte[] is a
    * serialized vertex with the first four bytes as the length of the vertex

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 725de39..585ab85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,6 +18,20 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.log4j.Logger;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -41,21 +55,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
 /**
  * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
  * Thread-safe, but expects the caller to synchronized between deletes, adds,