You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/10/09 08:35:17 UTC

[1/4] git commit: updated refs/heads/trunk to a95066c

Updated Branches:
  refs/heads/trunk c6c86aa65 -> a95066cd0


Refactored out SendCache to SendVertexIdDataCache and SendCache.


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e26b51e0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e26b51e0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e26b51e0

Branch: refs/heads/trunk
Commit: e26b51e0dc416fe026734b7079441baeefc33247
Parents: c6c86aa
Author: Avery Ching <ac...@fb.com>
Authored: Mon Jul 15 22:57:16 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Mon Oct 7 17:05:04 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/comm/SendCache.java  | 252 -------------------
 .../org/apache/giraph/comm/SendDataCache.java   | 227 +++++++++++++++++
 .../org/apache/giraph/comm/SendEdgeCache.java   |   2 +-
 .../apache/giraph/comm/SendMessageCache.java    |   8 +-
 .../giraph/comm/SendVertexIdDataCache.java      |  92 +++++++
 5 files changed, 324 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
deleted file mode 100644
index 30c07ee..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * An abstract structure for caching data indexed by vertex id,
- * to be sent to workers in bulk. Not thread-safe.
- *
- * @param <I> Vertex id
- * @param <T> Data
- * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
- */
-@SuppressWarnings("unchecked")
-public abstract class SendCache<I extends WritableComparable, T,
-    B extends ByteArrayVertexIdData<I, T>> {
-  /** How big to initially make output streams for each worker's partitions */
-  private final int[] initialBufferSizes;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
-  /** Service worker */
-  private final CentralizedServiceWorker serviceWorker;
-  /** Internal cache */
-  private final ByteArrayVertexIdData<I, T>[] dataCache;
-  /** Size of data (in bytes) for each worker */
-  private final int[] dataSizes;
-  /** Total number of workers */
-  private final int numWorkers;
-  /** List of partition ids belonging to a worker */
-  private final Map<WorkerInfo, List<Integer>> workerPartitions =
-      Maps.newHashMap();
-
-  /**
-   * Constructor.
-   *
-   * @param conf Giraph configuration
-   * @param serviceWorker Service worker
-   * @param maxRequestSize Maximum request size (in bytes)
-   * @param additionalRequestSize Additional request size (expressed as a
-   *                              ratio of the average request size)
-   */
-  public SendCache(ImmutableClassesGiraphConfiguration conf,
-                   CentralizedServiceWorker<?, ?, ?> serviceWorker,
-                   int maxRequestSize,
-                   float additionalRequestSize) {
-    this.conf = conf;
-    this.serviceWorker = serviceWorker;
-    int maxPartition = 0;
-    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
-      List<Integer> workerPartitionIds =
-          workerPartitions.get(partitionOwner.getWorkerInfo());
-      if (workerPartitionIds == null) {
-        workerPartitionIds = Lists.newArrayList();
-        workerPartitions.put(partitionOwner.getWorkerInfo(),
-            workerPartitionIds);
-      }
-      workerPartitionIds.add(partitionOwner.getPartitionId());
-      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
-    }
-    dataCache = new ByteArrayVertexIdData[maxPartition + 1];
-
-    int maxWorker = 0;
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
-    }
-    dataSizes = new int[maxWorker + 1];
-
-    int initialRequestSize =
-        (int) (maxRequestSize * (1 + additionalRequestSize));
-    initialBufferSizes = new int[maxWorker + 1];
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      initialBufferSizes[workerInfo.getTaskId()] =
-          initialRequestSize / workerPartitions.get(workerInfo).size();
-    }
-    numWorkers = maxWorker + 1;
-  }
-
-  /**
-   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
-   *
-   * @return A new instance of {@link ByteArrayVertexIdData}
-   */
-  public abstract B createByteArrayVertexIdData();
-
-  /**
-   * Add data to the cache.
-   *
-   * @param workerInfo the remote worker destination
-   * @param partitionId the remote Partition this message belongs to
-   * @param destVertexId vertex id that is ultimate destination
-   * @param data Data to send to remote worker
-   * @return Size of messages for the worker.
-   */
-  public int addData(WorkerInfo workerInfo,
-                     int partitionId, I destVertexId, T data) {
-    // Get the data collection
-    ByteArrayVertexIdData<I, T> partitionData =
-      getPartitionData(workerInfo, partitionId);
-    int originalSize = partitionData.getSize();
-    partitionData.add(destVertexId, data);
-    // Update the size of cached, outgoing data per worker
-    dataSizes[workerInfo.getTaskId()] +=
-      partitionData.getSize() - originalSize;
-    return dataSizes[workerInfo.getTaskId()];
-  }
-
-  /**
-   * This method is similar to the method above,
-   * but use a serialized id to replace original I type
-   * destVertexId.
-   *
-   * @param workerInfo The remote worker destination
-   * @param partitionId The remote Partition this message belongs to
-   * @param serializedId The byte array to store the serialized target vertex id
-   * @param idPos The length of bytes of serialized id in the byte array above
-   * @param data Data to send to remote worker
-   * @return The number of bytes added to the target worker
-   */
-  public int addData(WorkerInfo workerInfo, int partitionId,
-    byte[] serializedId, int idPos, T data) {
-    // Get the data collection
-    ByteArrayVertexIdData<I, T> partitionData =
-      getPartitionData(workerInfo, partitionId);
-    int originalSize = partitionData.getSize();
-    partitionData.add(serializedId, idPos, data);
-    // Update the size of cached, outgoing data per worker
-    dataSizes[workerInfo.getTaskId()] +=
-      partitionData.getSize() - originalSize;
-    return dataSizes[workerInfo.getTaskId()];
-  }
-
-  /**
-   * This method tries to get a partition data from the data cache.
-   * If null, it will create one.
-   *
-   * @param workerInfo The remote worker destination
-   * @param partitionId The remote Partition this message belongs to
-   * @return The partition data in data cache
-   */
-  private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
-    int partitionId) {
-    ByteArrayVertexIdData<I, T> partitionData = dataCache[partitionId];
-    if (partitionData == null) {
-      partitionData = createByteArrayVertexIdData();
-      partitionData.setConf(conf);
-      partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]);
-      dataCache[partitionId] = partitionData;
-    }
-    return partitionData;
-  }
-
-  /**
-   * Gets the data for a worker and removes it from the cache.
-   *
-   * @param workerInfo The address of the worker who owns the data
-   *                   partitions that are receiving the data
-   * @return List of pairs (partitionId, ByteArrayVertexIdData),
-   *         where all partition ids belong to workerInfo
-   */
-  public PairList<Integer, B>
-  removeWorkerData(WorkerInfo workerInfo) {
-    PairList<Integer, B> workerData = new PairList<Integer, B>();
-    List<Integer> partitions = workerPartitions.get(workerInfo);
-    workerData.initialize(partitions.size());
-    for (Integer partitionId : partitions) {
-      if (dataCache[partitionId] != null) {
-        workerData.add(partitionId, (B) dataCache[partitionId]);
-        dataCache[partitionId] = null;
-      }
-    }
-    dataSizes[workerInfo.getTaskId()] = 0;
-    return workerData;
-  }
-
-  /**
-   * Gets all the data and removes it from the cache.
-   *
-   * @return All data for all vertices for all partitions
-   */
-  public PairList<WorkerInfo, PairList<Integer, B>> removeAllData() {
-    PairList<WorkerInfo, PairList<Integer, B>> allData =
-        new PairList<WorkerInfo, PairList<Integer, B>>();
-    allData.initialize(dataSizes.length);
-    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
-      PairList<Integer, B> workerData = removeWorkerData(workerInfo);
-      if (!workerData.isEmpty()) {
-        allData.add(workerInfo, workerData);
-      }
-      dataSizes[workerInfo.getTaskId()] = 0;
-    }
-    return allData;
-  }
-
-  protected ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  /**
-   * Get the service worker.
-   *
-   * @return CentralizedServiceWorker
-   */
-  protected CentralizedServiceWorker getServiceWorker() {
-    return serviceWorker;
-  }
-
-  /**
-   * Get the initial buffer size for the messages sent to a worker.
-   *
-   * @param taskId The task ID of a worker.
-   * @return The initial buffer size for a worker.
-   */
-  protected int getSendWorkerInitialBufferSize(int taskId) {
-    return initialBufferSizes[taskId];
-  }
-
-  protected int getNumWorkers() {
-    return this.numWorkers;
-  }
-
-  protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
-    return workerPartitions;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
new file mode 100644
index 0000000..6973785
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract structure for caching data by partitions
+ * to be sent to workers in bulk. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <D> Data type
+ */
+@NotThreadSafe
+@SuppressWarnings("unchecked")
+public abstract class SendDataCache<I extends WritableComparable,
+    D extends Writable> {
+  /**
+   * Internal cache of partitions (index) to their partition caches of
+   * type D.
+   */
+  private final D[] dataCache;
+  /** How big to initially make output streams for each worker's partitions */
+  private final int[] initialBufferSizes;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** Service worker */
+  private final CentralizedServiceWorker serviceWorker;
+  /** Size of data (in bytes) for each worker */
+  private final int[] dataSizes;
+  /** Total number of workers */
+  private final int numWorkers;
+  /** List of partition ids belonging to a worker */
+  private final Map<WorkerInfo, List<Integer>> workerPartitions =
+      Maps.newHashMap();
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
+   * @param maxRequestSize Maximum request size (in bytes)
+   * @param additionalRequestSize Additional request size (expressed as a
+   *                              ratio of the average request size)
+   */
+  public SendDataCache(ImmutableClassesGiraphConfiguration conf,
+                       CentralizedServiceWorker<?, ?, ?> serviceWorker,
+                       int maxRequestSize,
+                       float additionalRequestSize) {
+    this.conf = conf;
+    this.serviceWorker = serviceWorker;
+    int maxPartition = 0;
+    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
+      List<Integer> workerPartitionIds =
+          workerPartitions.get(partitionOwner.getWorkerInfo());
+      if (workerPartitionIds == null) {
+        workerPartitionIds = Lists.newArrayList();
+        workerPartitions.put(partitionOwner.getWorkerInfo(),
+            workerPartitionIds);
+      }
+      workerPartitionIds.add(partitionOwner.getPartitionId());
+      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
+    }
+    dataCache = (D[]) new Writable[maxPartition + 1];
+
+    int maxWorker = 0;
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
+    }
+    dataSizes = new int[maxWorker + 1];
+
+    int initialRequestSize =
+        (int) (maxRequestSize * (1 + additionalRequestSize));
+    initialBufferSizes = new int[maxWorker + 1];
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      initialBufferSizes[workerInfo.getTaskId()] =
+          initialRequestSize / workerPartitions.get(workerInfo).size();
+    }
+    numWorkers = maxWorker + 1;
+  }
+
+  /**
+   * Gets the data for a worker and removes it from the cache.
+   *
+   * @param workerInfo the address of the worker who owns the data
+   *                   partitions that are receiving the data
+   * @return List of pairs (partitionId, ByteArrayVertexIdData),
+   *         where all partition ids belong to workerInfo
+   */
+  public PairList<Integer, D>
+  removeWorkerData(WorkerInfo workerInfo) {
+    PairList<Integer, D> workerData = new PairList<Integer, D>();
+    List<Integer> partitions = workerPartitions.get(workerInfo);
+    workerData.initialize(partitions.size());
+    for (Integer partitionId : partitions) {
+      if (dataCache[partitionId] != null) {
+        workerData.add(partitionId, (D) dataCache[partitionId]);
+        dataCache[partitionId] = null;
+      }
+    }
+    dataSizes[workerInfo.getTaskId()] = 0;
+    return workerData;
+  }
+
+  /**
+   * Gets all the data and removes it from the cache.
+   *
+   * @return All data for all vertices for all partitions
+   */
+  public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() {
+    PairList<WorkerInfo, PairList<Integer, D>> allData =
+        new PairList<WorkerInfo, PairList<Integer, D>>();
+    allData.initialize(dataSizes.length);
+    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
+      PairList<Integer, D> workerData = removeWorkerData(workerInfo);
+      if (!workerData.isEmpty()) {
+        allData.add(workerInfo, workerData);
+      }
+      dataSizes[workerInfo.getTaskId()] = 0;
+    }
+    return allData;
+  }
+
+  /**
+   * Get the data cache for a partition id
+   *
+   * @param partitionId Partition id
+   * @return Data cache for a partition
+   */
+  public D getData(int partitionId) {
+    return dataCache[partitionId];
+  }
+
+  /**
+   * Set the data cache for a partition id
+   *
+   * @param partitionId Partition id
+   * @param data Data to be set for a partition id
+   */
+  public void setData(int partitionId, D data) {
+    dataCache[partitionId] = data;
+  }
+
+  /**
+   * Get initial buffer size of a partition.
+   *
+   * @param partitionId Partition id
+   * @return Initial buffer size of a partition
+   */
+  public int getInitialBufferSize(int partitionId) {
+    return initialBufferSizes[partitionId];
+  }
+
+  /**
+   * Increment the data size
+   *
+   * @param partitionId Partition id
+   * @param size Size to increment by
+   * @return new data size
+   */
+  public int incrDataSize(int partitionId, int size) {
+    dataSizes[partitionId] += size;
+    return dataSizes[partitionId];
+  }
+
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Get the service worker.
+   *
+   * @return CentralizedServiceWorker
+   */
+  protected CentralizedServiceWorker getServiceWorker() {
+    return serviceWorker;
+  }
+
+  /**
+   * Get the initial buffer size for the messages sent to a worker.
+   *
+   * @param taskId The task ID of a worker.
+   * @return The initial buffer size for a worker.
+   */
+  protected int getSendWorkerInitialBufferSize(int taskId) {
+    return initialBufferSizes[taskId];
+  }
+
+  protected int getNumWorkers() {
+    return this.numWorkers;
+  }
+
+  protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
+    return workerPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index 5513da2..8350a55 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -38,7 +38,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
  * @param <E> Edge value
  */
 public class SendEdgeCache<I extends WritableComparable, E extends Writable>
-    extends SendCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
+    extends SendVertexIdDataCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
   /**
    * Constructor
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 8df0dda..24848db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -46,7 +46,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
  * @param <M> Message data
  */
 public class SendMessageCache<I extends WritableComparable, M extends Writable>
-    extends SendCache<I, M, ByteArrayVertexIdMessages<I, M>> {
+    extends SendVertexIdDataCache<I, M, ByteArrayVertexIdMessages<I, M>> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendMessageCache.class);
@@ -80,7 +80,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
   @Override
   public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-       getConf().getOutgoingMessageValueFactory());
+        getConf().getOutgoingMessageValueFactory());
   }
 
   /**
@@ -92,8 +92,8 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @param message Message to send to remote worker
    * @return Size of messages for the worker.
    */
-  private int addMessage(WorkerInfo workerInfo,
-      int partitionId, I destVertexId, M message) {
+  public int addMessage(WorkerInfo workerInfo,
+                        int partitionId, I destVertexId, M message) {
     return addData(workerInfo, partitionId, destVertexId, message);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
new file mode 100644
index 0000000..2623812
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.WritableComparable;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * An abstract structure for caching data indexed by vertex id,
+ * to be sent to workers in bulk. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ */
+@NotThreadSafe
+@SuppressWarnings("unchecked")
+public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
+    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<I, B> {
+  /**
+   * Constructor.
+   *
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
+   * @param maxRequestSize Maximum request size (in bytes)
+   * @param additionalRequestSize Additional request size (expressed as a
+   *                              ratio of the average request size)
+   */
+  public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf,
+                               CentralizedServiceWorker<?, ?, ?> serviceWorker,
+                               int maxRequestSize,
+                               float additionalRequestSize) {
+    super(conf, serviceWorker, maxRequestSize, additionalRequestSize);
+  }
+
+  /**
+   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+   *
+   * @return A new instance of {@link ByteArrayVertexIdData}
+   */
+  public abstract B createByteArrayVertexIdData();
+
+  /**
+   * Add data to the cache.
+   *
+   * @param workerInfo the remote worker destination
+   * @param partitionId the remote Partition this message belongs to
+   * @param destVertexId vertex id that is ultimate destination
+   * @param data Data to send to remote worker
+   * @return Size of messages for the worker.
+   */
+  public int addData(WorkerInfo workerInfo,
+                     int partitionId, I destVertexId, T data) {
+    // Get the data collection
+    B partitionData = getData(partitionId);
+    int originalSize = 0;
+    if (partitionData == null) {
+      partitionData = createByteArrayVertexIdData();
+      partitionData.setConf(getConf());
+      partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
+      setData(partitionId, partitionData);
+    } else {
+      originalSize = partitionData.getSize();
+    }
+    partitionData.add(destVertexId, data);
+
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(workerInfo.getTaskId(),
+        partitionData.getSize() - originalSize);
+  }
+}


[4/4] git commit: updated refs/heads/trunk to a95066c

Posted by ac...@apache.org.
Everything compiles.
All tests should run.
Next step is to add a test for the vertex combiner.
Should have fixed.
Fixed one bug for byte array partition.
Fixed another bug for too small of a message buffer.
Rebased.
Rebased.
Passes tests.  Need to add two more tests.
1)  Test VertexInputFormat and edge input format.
Done
2) Add a test to check that the vertex value combiner works.
Done
3)  Run an experiment to see if it is faster with my changes.
Passed 'mvn clean verify'
4) Now the edges are added for combined vertices.
5) Clean up.


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a95066cd
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a95066cd
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a95066cd

Branch: refs/heads/trunk
Commit: a95066cd06bdf4ed238e0bef304f28856817d9c9
Parents: e26b51e
Author: Avery Ching <ac...@fb.com>
Authored: Fri Jul 19 00:39:13 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Tue Oct 8 22:33:52 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   4 +-
 .../giraph/benchmark/PageRankBenchmark.java     |   4 +-
 .../benchmark/ShortestPathsBenchmark.java       |   4 +-
 .../benchmark/WeightedPageRankBenchmark.java    |  16 +-
 .../org/apache/giraph/combiner/Combiner.java    |  53 ---
 .../giraph/combiner/DoubleSumCombiner.java      |  39 --
 .../combiner/DoubleSumMessageCombiner.java      |  40 ++
 .../giraph/combiner/FloatSumCombiner.java       |  39 --
 .../combiner/FloatSumMessageCombiner.java       |  40 ++
 .../apache/giraph/combiner/MessageCombiner.java |  57 +++
 .../giraph/combiner/MinimumDoubleCombiner.java  |  41 --
 .../combiner/MinimumDoubleMessageCombiner.java  |  42 ++
 .../giraph/combiner/MinimumIntCombiner.java     |  40 --
 .../combiner/MinimumIntMessageCombiner.java     |  40 ++
 .../giraph/combiner/SimpleSumCombiner.java      |  40 --
 .../combiner/SimpleSumMessageCombiner.java      |  40 ++
 .../org/apache/giraph/comm/SendDataCache.java   |  13 +-
 .../apache/giraph/comm/SendPartitionCache.java  | 108 ++----
 .../giraph/comm/SendVertexIdDataCache.java      |  54 ++-
 .../comm/WorkerClientRequestProcessor.java      |   5 +-
 .../messages/InMemoryMessageStoreFactory.java   |  18 +-
 .../comm/messages/MessageStoreFactory.java      |   2 +-
 .../comm/messages/OneMessagePerVertexStore.java |  25 +-
 .../primitives/IntByteArrayMessageStore.java    |   2 +-
 .../primitives/IntFloatMessageStore.java        |  19 +-
 .../primitives/LongByteArrayMessageStore.java   |   2 +-
 .../primitives/LongDoubleMessageStore.java      |  19 +-
 .../giraph/comm/netty/NettyWorkerClient.java    |   2 +-
 .../NettyWorkerClientRequestProcessor.java      |  79 ++--
 .../giraph/comm/netty/NettyWorkerServer.java    |   4 +-
 .../comm/netty/handler/RequestEncoder.java      |   2 +-
 .../giraph/comm/requests/RequestType.java       |   2 +
 .../giraph/comm/requests/SendVertexRequest.java |   3 +-
 .../requests/SendWorkerVerticesRequest.java     | 129 +++++++
 .../org/apache/giraph/conf/GiraphClasses.java   |  56 ++-
 .../apache/giraph/conf/GiraphConfiguration.java |  35 +-
 .../org/apache/giraph/conf/GiraphConstants.java |  45 ++-
 .../ImmutableClassesGiraphConfiguration.java    |  48 ++-
 .../java/org/apache/giraph/edge/EdgeStore.java  |  12 +-
 .../apache/giraph/graph/ComputeCallable.java    |   2 +-
 .../graph/DefaultVertexValueCombiner.java       |  40 ++
 .../giraph/graph/VertexValueCombiner.java       |  39 ++
 .../IntIntNullTextVertexInputFormat.java        |  94 +++++
 .../io/formats/TextVertexValueInputFormat.java  |  57 +--
 .../job/GiraphConfigurationValidator.java       |  44 ++-
 .../org/apache/giraph/jython/JythonJob.java     |  15 +-
 .../org/apache/giraph/master/MasterCompute.java |  26 +-
 .../apache/giraph/master/SuperstepClasses.java  |  46 ++-
 .../apache/giraph/partition/BasicPartition.java |  20 +
 .../giraph/partition/ByteArrayPartition.java    |  66 +++-
 .../partition/DiskBackedPartitionStore.java     |  23 +-
 .../org/apache/giraph/partition/Partition.java  |  22 +-
 .../apache/giraph/partition/PartitionStore.java |  12 +-
 .../giraph/partition/SimplePartition.java       |  33 +-
 .../giraph/partition/SimplePartitionStore.java  |  17 +-
 .../giraph/utils/ByteArrayVertexIdMessages.java |   2 +-
 .../apache/giraph/utils/ConfigurationUtils.java |  20 +-
 .../org/apache/giraph/utils/VertexIterator.java | 143 +++++++
 .../org/apache/giraph/utils/WritableUtils.java  |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   8 +-
 .../org/apache/giraph/comm/RequestTest.java     |   2 +-
 .../TestIntFloatPrimitiveMessageStores.java     |   8 +-
 .../TestLongDoublePrimitiveMessageStores.java   |   8 +-
 .../org/apache/giraph/io/TestEdgeInput.java     | 261 -------------
 .../apache/giraph/io/TestVertexEdgeInput.java   | 385 +++++++++++++++++++
 .../master/TestComputationCombinerTypes.java    |  27 +-
 .../apache/giraph/master/TestSwitchClasses.java |  81 ++--
 .../giraph/partition/TestPartitionStores.java   |  14 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   7 +-
 .../ConnectedComponentsComputationTest.java     |   4 +-
 ...nectedComponentsComputationTestInMemory.java |   4 +-
 .../giraph/examples/MinimumIntCombinerTest.java |  23 +-
 .../examples/TryMultiIpcBindingPortsTest.java   |   4 +-
 .../giraph/vertex/TestComputationTypes.java     |  20 +-
 .../vertex/examples/HiveIntIntNullVertex.java   |  58 +++
 .../giraph/hive/jython/HiveJythonUtils.java     |   6 +-
 .../giraph/hive/input/HiveVertexInputTest.java  |  11 +-
 .../giraph/jython/count-edges-launcher.py       |   2 +-
 src/site/xdoc/quick_start.xml                   |   2 +-
 79 files changed, 1910 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index f960d09..8f046a5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -710,7 +710,7 @@ Release 1.0.0 - 2013-04-15
   GIRAPH-298: Reduce timeout for TestAutoCheckpoint. (majakabiljo via
   aching)
 	
-  GIRAPH-324: Add option to use combiner in benchmarks. (apresta via
+  GIRAPH-324: Add option to use messageCombiner in benchmarks. (apresta via
   aching)
 
   GIRAPH-191: Random walks on graphs (Gianmarco De Francisci Morales
@@ -843,7 +843,7 @@ Release 1.0.0 - 2013-04-15
   GIRAPH-236: Add FindBugs to maven build (Jan van der Lugt via
   aching).
 
-  GIRAPH-224: Netty server-side combiner (apresta via aching).
+  GIRAPH-224: Netty server-side messageCombiner (apresta via aching).
 
   GIRAPH-251: Allow to access the distributed cache from Vertexes and
   WorkerContext (Gianmarco De Francisci Morales via aching).

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index acc1c46..8fd529d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.combiner.FloatSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.IntNullArrayEdges;
@@ -76,7 +76,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
       conf.setComputationClass(PageRankComputation.class);
     }
     conf.setOutEdgesClass(IntNullArrayEdges.class);
-    conf.setCombinerClass(FloatSumCombiner.class);
+    conf.setMessageCombinerClass(FloatSumMessageCombiner.class);
     conf.setVertexInputFormatClass(
         PseudoRandomIntNullVertexInputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 0dd4529..33fc7f2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.giraph.combiner.MinimumDoubleCombiner;
+import org.apache.giraph.combiner.MinimumDoubleMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ArrayListEdges;
@@ -67,7 +67,7 @@ public class ShortestPathsBenchmark extends GiraphBenchmark {
     LOG.info("Using class " + GiraphConstants.COMPUTATION_CLASS.get(conf));
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     if (!NO_COMBINER.optionTurnedOn(cmd)) {
-      conf.setCombinerClass(MinimumDoubleCombiner.class);
+      conf.setMessageCombinerClass(MinimumDoubleMessageCombiner.class);
     }
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
         BenchmarkOption.VERTICES.getOptionLongValue(cmd));

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
index 2077674..8a796ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -18,9 +18,9 @@
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.combiner.DoubleSumCombiner;
 import org.apache.giraph.edge.ArrayListEdges;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.HashMapEdges;
@@ -63,9 +63,10 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
       "Partitioning algorithm (0 for hash partitioning (default), " +
           "1 for range partitioning)");
   /** Option for type of combiner */
-  private static final BenchmarkOption COMBINER_TYPE = new BenchmarkOption(
-      "t", "combinerType", true,
-      "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+  private static final BenchmarkOption MESSAGE_COMBINER_TYPE =
+      new BenchmarkOption("t", "combinerType", true,
+          "MessageCombiner type (0 for no combiner," +
+              " 1 for DoubleSumMessageCombiner (default)");
   /** Option for output format */
   private static final BenchmarkOption OUTPUT_FORMAT = new BenchmarkOption(
       "o", "vertexOutputFormat", true,
@@ -76,7 +77,8 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
     return Sets.newHashSet(
         BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES,
         BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.LOCAL_EDGES_MIN_RATIO,
-        EDGES_CLASS, EDGE_INPUT, PARTITIONER, COMBINER_TYPE, OUTPUT_FORMAT);
+        EDGES_CLASS, EDGE_INPUT, PARTITIONER,
+        MESSAGE_COMBINER_TYPE, OUTPUT_FORMAT);
   }
 
   /**
@@ -115,8 +117,8 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
 
     LOG.info("Using edges class " +
         GiraphConstants.VERTEX_EDGES_CLASS.get(configuration));
-    if (COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
-      configuration.setCombinerClass(DoubleSumCombiner.class);
+    if (MESSAGE_COMBINER_TYPE.getOptionIntValue(cmd, 1) == 1) {
+      configuration.setMessageCombinerClass(DoubleSumMessageCombiner.class);
     }
 
     if (EDGE_INPUT.optionTurnedOn(cmd)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
deleted file mode 100644
index 7830fff..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/Combiner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Abstract class to extend for combining messages sent to the same vertex.
- * Combiner for applications where each two messages for one vertex can be
- * combined into one.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public abstract class Combiner<I extends WritableComparable,
-    M extends Writable> {
-  /**
-   * Combine messageToCombine with originalMessage,
-   * by modifying originalMessage.
-   *
-   * @param vertexIndex Index of the vertex getting these messages
-   * @param originalMessage The first message which we want to combine;
-   *                        put the result of combining in this message
-   * @param messageToCombine The second message which we want to combine
-   */
-  public abstract void combine(I vertexIndex, M originalMessage,
-      M messageToCombine);
-
-  /**
-   * Get the initial message. When combined with any other message M,
-   * the result should be M.
-   *
-   * @return Initial message
-   */
-  public abstract M createInitialMessage();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
deleted file mode 100644
index 8da4ba7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumCombiner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * A combiner that sums double-valued messages
- */
-public class DoubleSumCombiner extends
-    Combiner<LongWritable, DoubleWritable> {
-  @Override
-  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
-      DoubleWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public DoubleWritable createInitialMessage() {
-    return new DoubleWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
new file mode 100644
index 0000000..163e0d8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A combiner that sums double-valued messages
+ */
+public class DoubleSumMessageCombiner
+    extends
+    MessageCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
deleted file mode 100644
index d898791..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumCombiner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * A combiner that sums float-valued messages
- */
-public class FloatSumCombiner extends
-    Combiner<IntWritable, FloatWritable> {
-  @Override
-  public void combine(IntWritable vertexIndex, FloatWritable originalMessage,
-      FloatWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public FloatWritable createInitialMessage() {
-    return new FloatWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
new file mode 100644
index 0000000..b13a7f7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * A combiner that sums float-valued messages
+ */
+public class FloatSumMessageCombiner
+    extends
+    MessageCombiner<IntWritable, FloatWritable> {
+  @Override
+  public void combine(IntWritable vertexIndex, FloatWritable originalMessage,
+      FloatWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public FloatWritable createInitialMessage() {
+    return new FloatWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
new file mode 100644
index 0000000..e53ab3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract class to extend for combining messages sent to the same vertex.
+ * MessageCombiner for applications where each two messages for one
+ * vertex can be combined into one.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public abstract class MessageCombiner<I extends WritableComparable,
+    M extends Writable> {
+  /**
+   * Combine messageToCombine with originalMessage, by modifying
+   * originalMessage.  Note that the messageToCombine object
+   * may be reused by the infrastructure, therefore, you cannot directly
+   * use it or any objects from it in original message
+   *
+   * @param vertexIndex Index of the vertex getting these messages
+   * @param originalMessage The first message which we want to combine;
+   *                        put the result of combining in this message
+   * @param messageToCombine The second message which we want to combine
+   *                         (object may be reused - do not reference it or its
+   *                         member objects)
+   */
+  public abstract void combine(I vertexIndex, M originalMessage,
+      M messageToCombine);
+
+  /**
+   * Get the initial message. When combined with any other message M,
+   * the result should be M.
+   *
+   * @return Initial message
+   */
+  public abstract M createInitialMessage();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
deleted file mode 100644
index 0a85d64..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleCombiner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Combiner which finds the minimum of {@link DoubleWritable}.
- */
-public class MinimumDoubleCombiner extends
-    Combiner<LongWritable, DoubleWritable> {
-  @Override
-  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
-      DoubleWritable messageToCombine) {
-    if (originalMessage.get() > messageToCombine.get()) {
-      originalMessage.set(messageToCombine.get());
-    }
-  }
-
-  @Override
-  public DoubleWritable createInitialMessage() {
-    return new DoubleWritable(Double.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
new file mode 100644
index 0000000..a1f4bd7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MessageCombiner which finds the minimum of {@link DoubleWritable}.
+ */
+public class MinimumDoubleMessageCombiner
+    extends
+    MessageCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
+    }
+  }
+
+  @Override
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(Double.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
deleted file mode 100644
index fcef58e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * {@link Combiner} that finds the minimum {@link IntWritable}
- */
-public class MinimumIntCombiner
-    extends Combiner<IntWritable, IntWritable> {
-  @Override
-  public void combine(IntWritable vertexIndex, IntWritable originalMessage,
-      IntWritable messageToCombine) {
-    if (originalMessage.get() > messageToCombine.get()) {
-      originalMessage.set(messageToCombine.get());
-    }
-  }
-
-  @Override
-  public IntWritable createInitialMessage() {
-    return new IntWritable(Integer.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
new file mode 100644
index 0000000..227c6e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * {@link MessageCombiner} that finds the minimum {@link IntWritable}
+ */
+public class MinimumIntMessageCombiner
+    extends MessageCombiner<IntWritable, IntWritable> {
+  @Override
+  public void combine(IntWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
+    }
+  }
+
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(Integer.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
deleted file mode 100644
index 2a11d2f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.combiner;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Combiner which sums up {@link IntWritable} message values.
- */
-public class SimpleSumCombiner
-    extends Combiner<LongWritable, IntWritable> {
-
-  @Override
-  public void combine(LongWritable vertexIndex, IntWritable originalMessage,
-      IntWritable messageToCombine) {
-    originalMessage.set(originalMessage.get() + messageToCombine.get());
-  }
-
-  @Override
-  public IntWritable createInitialMessage() {
-    return new IntWritable(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
new file mode 100644
index 0000000..1b4f5ef
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.combiner;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MessageCombiner which sums up {@link IntWritable} message values.
+ */
+public class SimpleSumMessageCombiner
+    extends MessageCombiner<LongWritable, IntWritable> {
+
+  @Override
+  public void combine(LongWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
index 6973785..4eb57fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java
@@ -23,11 +23,8 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.List;
@@ -37,13 +34,11 @@ import java.util.Map;
  * An abstract structure for caching data by partitions
  * to be sent to workers in bulk. Not thread-safe.
  *
- * @param <I> Vertex id
- * @param <D> Data type
+ * @param <D> Data type of partition cache
  */
 @NotThreadSafe
 @SuppressWarnings("unchecked")
-public abstract class SendDataCache<I extends WritableComparable,
-    D extends Writable> {
+public abstract class SendDataCache<D> {
   /**
    * Internal cache of partitions (index) to their partition caches of
    * type D.
@@ -51,8 +46,6 @@ public abstract class SendDataCache<I extends WritableComparable,
   private final D[] dataCache;
   /** How big to initially make output streams for each worker's partitions */
   private final int[] initialBufferSizes;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
   /** Service worker */
   private final CentralizedServiceWorker serviceWorker;
   /** Size of data (in bytes) for each worker */
@@ -92,7 +85,7 @@ public abstract class SendDataCache<I extends WritableComparable,
       workerPartitionIds.add(partitionOwner.getPartitionId());
       maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
     }
-    dataCache = (D[]) new Writable[maxPartition + 1];
+    dataCache = (D[]) new Object[maxPartition + 1];
 
     int maxWorker = 0;
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
index 524c9f1..8ec3164 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
@@ -17,19 +17,20 @@
  */
 package org.apache.giraph.comm;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GiraphTransferRegulator;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
 
-import java.util.Map;
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
 
 /**
  * Caches partition vertices prior to sending.  Aggregating these requests
@@ -40,93 +41,54 @@ import java.util.Map;
  * @param <E> Edge value
  */
 public class SendPartitionCache<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
+    V extends Writable, E extends Writable> extends
+    SendDataCache<ExtendedDataOutput> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionCache.class);
-  /** Input split vertex cache (only used when loading from input split) */
-  private final Map<PartitionOwner, Partition<I, V, E>>
-  ownerPartitionMap = Maps.newHashMap();
-  /** Context */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
-  /**
-   *  Regulates the size of outgoing Collections of vertices read
-   * by the local worker during INPUT_SUPERSTEP that are to be
-   * transfered from <code>inputSplitCache</code> to the owner
-   * of their initial, master-assigned Partition.*
-   */
-  private final GiraphTransferRegulator transferRegulator;
 
   /**
    * Constructor.
    *
-   * @param context Context
-   * @param configuration Configuration
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
    */
-  public SendPartitionCache(
-      Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
-    this.context = context;
-    this.configuration = configuration;
-    transferRegulator =
-        new GiraphTransferRegulator(configuration);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SendPartitionCache: maxVerticesPerTransfer = " +
-          transferRegulator.getMaxVerticesPerTransfer());
-      LOG.info("SendPartitionCache: maxEdgesPerTransfer = " +
-          transferRegulator.getMaxEdgesPerTransfer());
-    }
+  public SendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
+                            CentralizedServiceWorker<?, ?, ?> serviceWorker) {
+    super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
+        ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
   }
 
   /**
-   * Add a vertex to the cache, returning the partition if full
+   * Add a vertex to the cache.
    *
    * @param partitionOwner Partition owner of the vertex
    * @param vertex Vertex to add
-   * @return A partition to send or null, if requirements are not met
+   * @return Size of partitions for this worker
    */
-  public Partition<I, V, E> addVertex(PartitionOwner partitionOwner,
+  public int addVertex(PartitionOwner partitionOwner,
       Vertex<I, V, E> vertex) {
-    Partition<I, V, E> partition =
-        ownerPartitionMap.get(partitionOwner);
-    if (partition == null) {
-      partition = configuration.createPartition(
-          partitionOwner.getPartitionId(),
-          context);
-      ownerPartitionMap.put(partitionOwner, partition);
-    }
-    transferRegulator.incrementCounters(partitionOwner, vertex);
-
-    Vertex<I, V, E> oldVertex = partition.putVertex(vertex);
-    if (oldVertex != null) {
-      LOG.warn("addVertex: Replacing vertex " + oldVertex +
-          " with " + vertex);
+    // Get the data collection
+    ExtendedDataOutput partitionData =
+        getData(partitionOwner.getPartitionId());
+    int taskId = partitionOwner.getWorkerInfo().getTaskId();
+    int originalSize = 0;
+    if (partitionData == null) {
+      partitionData = getConf().createExtendedDataOutput(
+          getInitialBufferSize(taskId));
+      setData(partitionOwner.getPartitionId(), partitionData);
+    } else {
+      originalSize = partitionData.getPos();
     }
-
-    // Requirements met to transfer?
-    if (transferRegulator.transferThisPartition(partitionOwner)) {
-      return ownerPartitionMap.remove(partitionOwner);
+    try {
+      WritableUtils.<I, V, E>writeVertexToDataOutput(
+          partitionData, vertex, getConf());
+    } catch (IOException e) {
+      throw new IllegalStateException("addVertex: Failed to serialize", e);
     }
 
-    return null;
-  }
-
-  /**
-   * Get the owner partition map (for flushing)
-   *
-   * @return Owner partition map
-   */
-  public Map<PartitionOwner, Partition<I, V, E>> getOwnerPartitionMap() {
-    return ownerPartitionMap;
-  }
-
-  /**
-   * Clear the cache.
-   */
-  public void clear() {
-    ownerPartitionMap.clear();
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(taskId, partitionData.getPos() - originalSize);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
index 2623812..afce3ba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
@@ -37,7 +37,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 @NotThreadSafe
 @SuppressWarnings("unchecked")
 public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
-    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<I, B> {
+    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<B> {
   /**
    * Constructor.
    *
@@ -73,20 +73,58 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
   public int addData(WorkerInfo workerInfo,
                      int partitionId, I destVertexId, T data) {
     // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+        getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(destVertexId, data);
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(workerInfo.getTaskId(),
+        partitionData.getSize() - originalSize);
+  }
+
+  /**
+   * This method is similar to the method above,
+   * but use a serialized id to replace original I type
+   * destVertexId.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @param serializedId The byte array to store the serialized target vertex id
+   * @param idPos The length of bytes of serialized id in the byte array above
+   * @param data Data to send to remote worker
+   * @return The number of bytes added to the target worker
+   */
+  public int addData(WorkerInfo workerInfo, int partitionId,
+                     byte[] serializedId, int idPos, T data) {
+    // Get the data collection
+    ByteArrayVertexIdData<I, T> partitionData =
+        getPartitionData(workerInfo, partitionId);
+    int originalSize = partitionData.getSize();
+    partitionData.add(serializedId, idPos, data);
+    // Update the size of cached, outgoing data per worker
+    return incrDataSize(workerInfo.getTaskId(),
+        partitionData.getSize() - originalSize);
+  }
+
+  /**
+   * This method tries to get a partition data from the data cache.
+   * If null, it will create one.
+   *
+   * @param workerInfo The remote worker destination
+   * @param partitionId The remote Partition this message belongs to
+   * @return The partition data in data cache
+   */
+  private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
+                                                       int partitionId) {
+    // Get the data collection
     B partitionData = getData(partitionId);
-    int originalSize = 0;
     if (partitionData == null) {
       partitionData = createByteArrayVertexIdData();
       partitionData.setConf(getConf());
       partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
       setData(partitionId, partitionData);
-    } else {
-      originalSize = partitionData.getSize();
     }
-    partitionData.add(destVertexId, data);
 
-    // Update the size of cached, outgoing data per worker
-    return incrDataSize(workerInfo.getTaskId(),
-        partitionData.getSize() - originalSize);
+    return partitionData;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index 9bdf9ca..f788051 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -66,9 +66,10 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    *
    * @param partitionOwner Owner of the vertex
    * @param vertex Vertex to send
+   * @return Returns true iff any network I/O occurred.
    */
-  void sendVertexRequest(PartitionOwner partitionOwner,
-                         Vertex<I, V, E> vertex);
+  boolean sendVertexRequest(PartitionOwner partitionOwner,
+                            Vertex<I, V, E> vertex);
 
   /**
    * Send a partition request (no batching).

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 0cdfb73..5f0e929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
@@ -70,23 +70,23 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       MessageValueFactory<M> messageValueFactory) {
     Class<M> messageClass = messageValueFactory.getValueClass();
     MessageStore messageStore;
-    if (conf.useCombiner()) {
+    if (conf.useMessageCombiner()) {
       Class<I> vertexIdClass = conf.getVertexIdClass();
       if (vertexIdClass.equals(IntWritable.class) &&
           messageClass.equals(FloatWritable.class)) {
         messageStore = new IntFloatMessageStore(
             (CentralizedServiceWorker<IntWritable, ?, ?>) service,
-            (Combiner<IntWritable, FloatWritable>)
-                conf.<FloatWritable>createCombiner());
+            (MessageCombiner<IntWritable, FloatWritable>)
+                conf.<FloatWritable>createMessageCombiner());
       } else if (vertexIdClass.equals(LongWritable.class) &&
           messageClass.equals(DoubleWritable.class)) {
         messageStore = new LongDoubleMessageStore(
             (CentralizedServiceWorker<LongWritable, ?, ?>) service,
-            (Combiner<LongWritable, DoubleWritable>)
-                conf.<DoubleWritable>createCombiner());
+            (MessageCombiner<LongWritable, DoubleWritable>)
+                conf.<DoubleWritable>createMessageCombiner());
       } else {
         messageStore = new OneMessagePerVertexStore<I, M>(messageValueFactory,
-          service, conf.<M>createCombiner(), conf);
+          service, conf.<M>createMessageCombiner(), conf);
       }
     } else {
       Class<I> vertexIdClass = conf.getVertexIdClass();
@@ -108,8 +108,8 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       LOG.info("newStore: Created " + messageStore.getClass() +
           " for vertex id " + conf.getVertexIdClass() +
           " and message value " + messageClass + " and" +
-          (conf.useCombiner() ? " combiner " + conf.getCombinerClass() :
-              " no combiner"));
+          (conf.useMessageCombiner() ? " message combiner " +
+              conf.getMessageCombinerClass() : " no combiner"));
     }
     return (MessageStore<I, M>) messageStore;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 254afd4..f582ea2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -34,7 +34,7 @@ public interface MessageStoreFactory<I extends WritableComparable,
   /**
    * Creates new message store.
    *
-   * Note: Combiner class in Configuration can be changed,
+   * Note: MessageCombiner class in Configuration can be changed,
    * this method should return MessageStore which uses current combiner
    *
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 4f150da..acf68ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -35,29 +35,30 @@ import java.util.concurrent.ConcurrentMap;
 /**
  * Implementation of {@link SimpleMessageStore} where we have a single
  * message per vertex.
- * Used when {@link Combiner} is provided.
+ * Used when {@link org.apache.giraph.combiner.MessageCombiner} is provided.
  *
  * @param <I> Vertex id
  * @param <M> Message data
  */
 public class OneMessagePerVertexStore<I extends WritableComparable,
     M extends Writable> extends SimpleMessageStore<I, M, M> {
-  /** Combiner for messages */
-  private final Combiner<I, M> combiner;
+  /** MessageCombiner for messages */
+  private final MessageCombiner<I, M> messageCombiner;
 
   /**
    * @param messageValueFactory Message class held in the store
-   * @param service  Service worker
-   * @param combiner Combiner for messages
-   * @param config   Hadoop configuration
+   * @param service Service worker
+   * @param messageCombiner MessageCombiner for messages
+   * @param config Hadoop configuration
    */
   OneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      Combiner<I, M> combiner,
+      MessageCombiner<I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     super(messageValueFactory, service, config);
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
   }
 
   @Override
@@ -76,7 +77,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       M currentMessage =
           partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
       if (currentMessage == null) {
-        M newMessage = combiner.createInitialMessage();
+        M newMessage = messageCombiner.createInitialMessage();
         currentMessage = partitionMap.putIfAbsent(
             vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
         if (currentMessage == null) {
@@ -84,7 +85,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
         }
       }
       synchronized (currentMessage) {
-        combiner.combine(vertexId, currentMessage,
+        messageCombiner.combine(vertexId, currentMessage,
             vertexIdMessageIterator.getCurrentMessage());
       }
     }
@@ -157,7 +158,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     public MessageStore<I, M> newStore(
         MessageValueFactory<M> messageValueFactory) {
       return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-          config.<M>createCombiner(), config);
+          config.<M>createMessageCombiner(), config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index cdab2e0..c58868a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -81,7 +81,7 @@ public class IntByteArrayMessageStore<M extends Writable>
         new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<IntWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
           new Int2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index a193fb9..d75c758 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages.primitives;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -43,7 +43,7 @@ import java.util.List;
 
 /**
  * Special message store to be used when ids are IntWritable and messages
- * are FloatWritable and combiner is used.
+ * are FloatWritable and messageCombiner is used.
  * Uses fastutil primitive maps in order to decrease number of objects and
  * get better performance.
  */
@@ -51,8 +51,8 @@ public class IntFloatMessageStore
     implements MessageStore<IntWritable, FloatWritable> {
   /** Map from partition id to map from vertex id to message */
   private final Int2ObjectOpenHashMap<Int2FloatOpenHashMap> map;
-  /** Message combiner */
-  private final Combiner<IntWritable, FloatWritable> combiner;
+  /** Message messageCombiner */
+  private final MessageCombiner<IntWritable, FloatWritable> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<IntWritable, ?, ?> service;
 
@@ -60,18 +60,19 @@ public class IntFloatMessageStore
    * Constructor
    *
    * @param service Service worker
-   * @param combiner Message combiner
+   * @param messageCombiner Message messageCombiner
    */
   public IntFloatMessageStore(
       CentralizedServiceWorker<IntWritable, ?, ?> service,
-      Combiner<IntWritable, FloatWritable> combiner) {
+      MessageCombiner<IntWritable, FloatWritable> messageCombiner) {
     this.service = service;
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<IntWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Int2FloatOpenHashMap partitionMap =
           new Int2FloatOpenHashMap((int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
@@ -109,7 +110,7 @@ public class IntFloatMessageStore
           reusableVertexId.set(vertexId);
           reusableMessage.set(message);
           reusableCurrentMessage.set(partitionMap.get(vertexId));
-          combiner.combine(reusableVertexId, reusableCurrentMessage,
+          messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
               reusableMessage);
           message = reusableCurrentMessage.get();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 3272ced..b0a613b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -82,7 +82,7 @@ public class LongByteArrayMessageStore<M extends Writable>
         new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<LongWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
           new Long2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 96ed5b4..b1f32d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages.primitives;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -43,7 +43,7 @@ import java.util.List;
 
 /**
  * Special message store to be used when ids are LongWritable and messages
- * are DoubleWritable and combiner is used.
+ * are DoubleWritable and messageCombiner is used.
  * Uses fastutil primitive maps in order to decrease number of objects and
  * get better performance.
  */
@@ -51,8 +51,8 @@ public class LongDoubleMessageStore
     implements MessageStore<LongWritable, DoubleWritable> {
   /** Map from partition id to map from vertex id to message */
   private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
-  /** Message combiner */
-  private final Combiner<LongWritable, DoubleWritable> combiner;
+  /** Message messageCombiner */
+  private final MessageCombiner<LongWritable, DoubleWritable> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<LongWritable, ?, ?> service;
 
@@ -60,18 +60,19 @@ public class LongDoubleMessageStore
    * Constructor
    *
    * @param service Service worker
-   * @param combiner Message combiner
+   * @param messageCombiner Message messageCombiner
    */
   public LongDoubleMessageStore(
       CentralizedServiceWorker<LongWritable, ?, ?> service,
-      Combiner<LongWritable, DoubleWritable> combiner) {
+      MessageCombiner<LongWritable, DoubleWritable> messageCombiner) {
     this.service = service;
-    this.combiner = combiner;
+    this.messageCombiner =
+        messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<LongWritable, ?, ?> partition =
-          service.getPartitionStore().getPartition(partitionId);
+          service.getPartitionStore().getOrCreatePartition(partitionId);
       Long2DoubleOpenHashMap partitionMap =
           new Long2DoubleOpenHashMap((int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
@@ -109,7 +110,7 @@ public class LongDoubleMessageStore
           reusableVertexId.set(vertexId);
           reusableMessage.set(message);
           reusableCurrentMessage.set(partitionMap.get(vertexId));
-          combiner.combine(reusableVertexId, reusableCurrentMessage,
+          messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
               reusableMessage);
           message = reusableCurrentMessage.get();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 28f3656..7541418 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -90,7 +90,7 @@ public class NettyWorkerClient<I extends WritableComparable,
   @Override
   public void newSuperstep(SuperstepMetricsRegistry metrics) {
     superstepRequestCounters.clear();
-    superstepRequestCounters.put(RequestType.SEND_VERTEX_REQUEST,
+    superstepRequestCounters.put(RequestType.SEND_WORKER_VERTICES_REQUEST,
         metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
     superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST,
         metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 34a3d1f..0166713 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.comm.netty;
 
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.SendEdgeCache;
@@ -32,8 +35,10 @@ import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
+import org.apache.giraph.comm.requests.SendWorkerVerticesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
@@ -45,6 +50,7 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -52,18 +58,10 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
-
 /**
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
  * class is not thread-safe and expected to be used and then thrown away after
@@ -91,8 +89,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
   private final WorkerClient<I, V, E> workerClient;
+  /** Messages sent during the last superstep */
+  private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
+  /** Maximum size of vertices per remote worker to cache before sending. */
+  private final int maxVerticesSizePerWorker;
   /** Maximum size of edges per remote worker to cache before sending. */
   private final int maxEdgesSizePerWorker;
   /** Maximum number of mutations per partition before sending */
@@ -124,9 +126,14 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     this.workerClient = serviceWorker.getWorkerClient();
     this.configuration = conf;
 
-    sendPartitionCache = new SendPartitionCache<I, V, E>(context, conf);
+
+    sendPartitionCache =
+        new SendPartitionCache<I, V, E>(conf, serviceWorker);
     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
-    maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
+    maxMessagesSizePerWorker =
+        GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
+    maxVerticesSizePerWorker =
+        GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
     if (this.configuration.isOneToAllMsgSendingEnabled()) {
       sendMessageCache =
         new SendMessageToAllCache<I, Writable>(conf, serviceWorker,
@@ -136,8 +143,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         new SendMessageCache<I, Writable>(conf, serviceWorker,
           this, maxMessagesSizePerWorker);
     }
-    maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
-    maxMutationsPerPartition = MAX_MUTATIONS_PER_REQUEST.get(conf);
+    maxEdgesSizePerWorker =
+        GiraphConfiguration.MAX_EDGE_REQUEST_SIZE.get(conf);
+    maxMutationsPerPartition =
+        GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST.get(conf);
     this.serviceWorker = serviceWorker;
     this.serverData = serviceWorker.getServerData();
 
@@ -249,15 +258,26 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   }
 
   @Override
-  public void sendVertexRequest(PartitionOwner partitionOwner,
+  public boolean sendVertexRequest(PartitionOwner partitionOwner,
       Vertex<I, V, E> vertex) {
-    Partition<I, V, E> partition =
-        sendPartitionCache.addVertex(partitionOwner, vertex);
-    if (partition == null) {
-      return;
+    // Add the vertex to the cache
+    int workerMessageSize = sendPartitionCache.addVertex(
+        partitionOwner, vertex);
+
+    // Send a request if the cache of outgoing message to
+    // the remote worker 'workerInfo' is full enough to be flushed
+    if (workerMessageSize >= maxVerticesSizePerWorker) {
+      PairList<Integer, ExtendedDataOutput>
+          workerPartitionVertices =
+          sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
+      WritableRequest writableRequest =
+          new SendWorkerVerticesRequest<I, V, E>(
+              configuration, workerPartitionVertices);
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
+      return true;
     }
 
-    sendPartitionRequest(partitionOwner.getWorkerInfo(), partition);
+    return false;
   }
 
   @Override
@@ -390,17 +410,24 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
   @Override
   public void flush() throws IOException {
-    // Execute the remaining send partitions (if any)
-    for (Map.Entry<PartitionOwner, Partition<I, V, E>> entry :
-        sendPartitionCache.getOwnerPartitionMap().entrySet()) {
-      sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
-    }
-    sendPartitionCache.clear();
-
     // Execute the remaining sends messages (if any)
     // including one-to-one and one-to-all messages.
     sendMessageCache.flush();
 
+    // Execute the remaining sends vertices (if any)
+    PairList<WorkerInfo, PairList<Integer, ExtendedDataOutput>>
+        remainingVertexCache = sendPartitionCache.removeAllData();
+    PairList<WorkerInfo,
+        PairList<Integer, ExtendedDataOutput>>.Iterator
+        vertexIterator = remainingVertexCache.getIterator();
+    while (vertexIterator.hasNext()) {
+      vertexIterator.next();
+      WritableRequest writableRequest =
+          new SendWorkerVerticesRequest(
+              configuration, vertexIterator.getCurrentSecond());
+      doRequest(vertexIterator.getCurrentFirst(), writableRequest);
+    }
+
     // Execute the remaining sends edges (if any)
     PairList<WorkerInfo, PairList<Integer,
         ByteArrayVertexIdEdges<I, E>>>
@@ -448,7 +475,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param writableRequest Request to either submit or run locally
    */
   public void doRequest(WorkerInfo workerInfo,
-    WritableRequest writableRequest) {
+                         WritableRequest writableRequest) {
     // If this is local, execute locally
     if (serviceWorker.getWorkerInfo().getTaskId() ==
         workerInfo.getTaskId()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 3473de1..4f6c17b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -161,7 +161,7 @@ public class NettyWorkerServer<I extends WritableComparable,
           getPartitionDestinationVertices(partitionId);
       if (!Iterables.isEmpty(destinations)) {
         Partition<I, V, E> partition =
-            service.getPartitionStore().getPartition(partitionId);
+            service.getPartitionStore().getOrCreatePartition(partitionId);
         for (I vertexId : destinations) {
           if (partition.getVertex(vertexId) == null) {
             if (!resolveVertexIndices.put(partitionId, vertexId)) {
@@ -179,7 +179,7 @@ public class NettyWorkerServer<I extends WritableComparable,
     for (Entry<Integer, Collection<I>> e :
         resolveVertexIndices.asMap().entrySet()) {
       Partition<I, V, E> partition =
-          service.getPartitionStore().getPartition(e.getKey());
+          service.getPartitionStore().getOrCreatePartition(e.getKey());
       for (I vertexIndex : e.getValue()) {
         Vertex<I, V, E> originalVertex =
             partition.getVertex(vertexIndex);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 83b408e..f49a2b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -94,7 +94,7 @@ public class RequestEncoder extends OneToOneEncoder {
       writableRequest.write(outputStream);
     } catch (IndexOutOfBoundsException e) {
       LOG.error("encode: Most likely the size of request was not properly " +
-          "specified - see getSerializedSize() in " +
+          "specified (this buffer is too small) - see getSerializedSize() in " +
           writableRequest.getType().getRequestClass());
       throw new IllegalStateException(e);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index a1dcece..7fe2ae7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -34,6 +34,8 @@ public enum RequestType {
   /*end[HADOOP_NON_SECURE]*/
   /** Sending vertices request */
   SEND_VERTEX_REQUEST(SendVertexRequest.class),
+  /** Sending vertices request */
+  SEND_WORKER_VERTICES_REQUEST(SendWorkerVerticesRequest.class),
   /** Sending a partition of messages for next superstep */
   SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
   /** Sending one-to-all messages to a worker for next superstep */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
index e0cb916..863d7ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
@@ -29,7 +29,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * Send a collection of vertices for a partition.
+ * Send a collection of vertices for a partition.  Note that this doesn't
+ * use a cache - might want to optimize in the future.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
new file mode 100644
index 0000000..386e0ee
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIterator;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Send to a worker one or more partitions of vertices
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class SendWorkerVerticesRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendWorkerVerticesRequest.class);
+  /** Worker partitions to be sent */
+  private PairList<Integer, ExtendedDataOutput> workerPartitions;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerVerticesRequest() { }
+
+  /**
+   * Constructor for sending a request.
+   *
+   * @param conf Configuration
+   * @param workerPartitions Partitions to be send in this request
+   */
+  public SendWorkerVerticesRequest(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      PairList<Integer, ExtendedDataOutput> workerPartitions) {
+    this.workerPartitions = workerPartitions;
+    setConf(conf);
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    int numPartitions = input.readInt();
+    workerPartitions = new PairList<Integer, ExtendedDataOutput>();
+    workerPartitions.initialize(numPartitions);
+    while (numPartitions-- > 0) {
+      final int partitionId = input.readInt();
+      ExtendedDataOutput partitionData =
+          WritableUtils.readExtendedDataOutput(input, getConf());
+      workerPartitions.add(partitionId, partitionData);
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(workerPartitions.getSize());
+    PairList<Integer, ExtendedDataOutput>.Iterator
+        iterator = workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      output.writeInt(iterator.getCurrentFirst());
+      WritableUtils.writeExtendedDataOutput(
+          iterator.getCurrentSecond(), output);
+    }
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_VERTICES_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E> serverData) {
+    PairList<Integer, ExtendedDataOutput>.Iterator
+        iterator = workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      VertexIterator<I, V, E> vertexIterator =
+          new VertexIterator<I, V, E>(
+          iterator.getCurrentSecond(), getConf());
+      serverData.getPartitionStore().getOrCreatePartition(
+          iterator.getCurrentFirst()).addPartitionVertices(
+          vertexIterator);
+    }
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = 0;
+    PairList<Integer, ExtendedDataOutput>.Iterator iterator =
+        workerPartitions.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      // 4 bytes for the partition id and 4 bytes for the size
+      size += 8 + iterator.getCurrentSecond().getPos();
+    }
+    return size;
+  }
+}
+


[2/4] Everything compiles. All tests should run. Next step is to add a test for the vertex combiner. Should have fixed. Fixed one bug for byte array partition. Fixed another bug for too small of a message buffer. Rebased. Rebased. Passes tests. Need to

Posted by ac...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
deleted file mode 100644
index 45d946f..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.factories.VertexValueFactory;
-import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
-import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
-import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
-import org.apache.giraph.utils.ComputationCountEdges;
-import org.apache.giraph.utils.IntIntNullNoOpComputation;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * A test case to ensure that loading a graph from a list of edges works as
- * expected.
- */
-public class TestEdgeInput extends BspCase {
-  public TestEdgeInput() {
-    super(TestEdgeInput.class.getName());
-  }
-
-  // It should be able to build a graph starting from the edges only.
-  // Vertices should be implicitly created with default values.
-  @Test
-  public void testEdgesOnly() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges have been created
-    assertEquals(3, values.size());
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(1, (int) values.get(4));
-  }
-
-  // It should be able to build a graph starting from the edges only.
-  // Using ReverseEdgeDuplicator it should also create the reverse edges.
-  // Vertices should be implicitly created with default values.
-  @Test
-  public void testEdgesOnlyWithReverse() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges have been created
-    assertEquals(4, values.size());
-    // Check the number of edges for each vertex
-    assertEquals(2, (int) values.get(1));
-    assertEquals(3, (int) values.get(2));
-    assertEquals(1, (int) values.get(3));
-    assertEquals(2, (int) values.get(4));
-  }
-
-  // It should be able to build a graph by specifying vertex data and edges
-  // as separate input formats.
-  @Test
-  public void testMixedFormat() throws Exception {
-    String[] vertices = new String[] {
-        "1 75",
-        "2 34",
-        "3 13",
-        "4 32"
-    };
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1",
-        "5 3"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(IntIntNullNoOpComputation.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    // Run a job with a vertex that does nothing
-    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with either initial values or outgoing edges
-    // have been created
-    assertEquals(5, values.size());
-    // Check that the vertices have been created with correct values
-    assertEquals(75, (int) values.get(1));
-    assertEquals(34, (int) values.get(2));
-    assertEquals(13, (int) values.get(3));
-    assertEquals(32, (int) values.get(4));
-    // A vertex with edges but no initial value should have the default value
-    assertEquals(0, (int) values.get(5));
-
-    // Run a job with a custom VertexValueFactory
-    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
-    results = InternalVertexRunner.run(conf, vertices, edges);
-    values = parseResults(results);
-    // A vertex with edges but no initial value should have been constructed
-    // by the custom factory
-    assertEquals(3, (int) values.get(5));
-
-    conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    // Run a job with a vertex that counts outgoing edges
-    results = InternalVertexRunner.run(conf, vertices, edges);
-
-    values = parseResults(results);
-
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(0, (int) values.get(3));
-    assertEquals(1, (int) values.get(4));
-    assertEquals(1, (int) values.get(5));
-  }
-
-  // It should use the specified input OutEdges class.
-  @Test
-  public void testDifferentInputEdgesClass() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(TestComputationCheckEdgesType.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges in the input have been
-    // created
-    assertEquals(3, values.size());
-    // Check the number of edges for each vertex (edges with odd target id
-    // should have been removed)
-    assertEquals(1, (int) values.get(1));
-    assertEquals(1, (int) values.get(2));
-    assertEquals(0, (int) values.get(4));
-  }
-
-  public static class TestComputationCheckEdgesType extends
-      ComputationCountEdges {
-    @Override
-    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
-        Iterable<NullWritable> messages) throws IOException {
-      assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
-      assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
-      super.compute(vertex, messages);
-    }
-  }
-
-  public static class TestVertexValueFactory
-      implements VertexValueFactory<IntWritable> {
-    @Override
-    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
-    @Override
-    public Class<IntWritable> getValueClass() {
-      return IntWritable.class;
-    }
-
-    @Override
-    public IntWritable newInstance() {
-      return new IntWritable(3);
-    }
-  }
-
-  public static class TestOutEdgesFilterEven
-      extends ByteArrayEdges<IntWritable, NullWritable> {
-    @Override
-    public void add(Edge<IntWritable, NullWritable> edge) {
-      if (edge.getTargetVertexId().get() % 2 == 0) {
-        super.add(edge);
-      }
-    }
-  }
-
-  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
-    Map<Integer, Integer> values = Maps.newHashMap();
-    for (String line : results) {
-      String[] tokens = line.split("\\s+");
-      int id = Integer.valueOf(tokens[0]);
-      int value = Integer.valueOf(tokens[1]);
-      values.put(id, value);
-    }
-    return values;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
new file mode 100644
index 0000000..721a74c
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.factories.VertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
+import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
+import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.ComputationCountEdges;
+import org.apache.giraph.utils.IntIntNullNoOpComputation;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * A test case to ensure that loading a graph from vertices and edges works as
+ * expected.
+ */
+public class TestVertexEdgeInput extends BspCase {
+  public TestVertexEdgeInput() {
+    super(TestVertexEdgeInput.class.getName());
+  }
+
+  // It should be able to build a graph starting from the edges only.
+  // Vertices should be implicitly created with default values.
+  @Test
+  public void testEdgesOnly() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(1, (int) values.get(4));
+  }
+
+  // It should be able to build a graph starting from the edges only.
+  // Using ReverseEdgeDuplicator it should also create the reverse edges.
+  // Vertices should be implicitly created with default values.
+  @Test
+  public void testEdgesOnlyWithReverse() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(4, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(2, (int) values.get(1));
+    assertEquals(3, (int) values.get(2));
+    assertEquals(1, (int) values.get(3));
+    assertEquals(2, (int) values.get(4));
+  }
+
+  /**
+   * Simple vertex value combiner that sums up the vertex values.
+   */
+  public static class IntSumVertexValueCombiner implements VertexValueCombiner<IntWritable> {
+    @Override
+    public void combine(IntWritable originalVertexValue, IntWritable vertexValue) {
+      originalVertexValue.set(originalVertexValue.get() + vertexValue.get());
+    }
+  }
+
+  // It should be able to build a graph by specifying vertex value data
+  // and combining the duplicates (summation).  Edges should be added as well.
+  @Test
+  public void testVertexValueCombiner() throws Exception {
+    String[] vertices = new String[] {
+        "1 75 2",
+        "2 34 3",
+        "3 13",
+        "4 32",
+        "1 11",
+        "2 23 1",
+        "2 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setVertexValueCombinerClass(IntSumVertexValueCombiner.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices were created
+    assertEquals(4, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(86, (int) values.get(1));
+    assertEquals(60, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+
+    // Run a job with a vertex that counts outgoing edges
+    conf.setComputationClass(ComputationCountEdges.class);
+    results = InternalVertexRunner.run(conf, vertices);
+
+    // Check that the edges were added as well
+    values = parseResults(results);
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(0, (int) values.get(4));
+  }
+
+  // It should be able to build a graph by specifying vertex value data
+  // and edges as separate input formats.
+  @Test
+  public void testMixedVertexValueEdgeFormat() throws Exception {
+    String[] vertices = new String[] {
+        "1 75",
+        "2 34",
+        "3 13",
+        "4 32"
+    };
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1",
+        "5 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with either initial values or outgoing edges
+    // have been created
+    assertEquals(5, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(75, (int) values.get(1));
+    assertEquals(34, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+    // A vertex with edges but no initial value should have the default value
+    assertEquals(0, (int) values.get(5));
+
+    // Run a job with a custom VertexValueFactory
+    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
+    results = InternalVertexRunner.run(conf, vertices, edges);
+    values = parseResults(results);
+    // A vertex with edges but no initial value should have been constructed
+    // by the custom factory
+    assertEquals(3, (int) values.get(5));
+
+    conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that counts outgoing edges
+    results = InternalVertexRunner.run(conf, vertices, edges);
+
+    values = parseResults(results);
+
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(1, (int) values.get(4));
+    assertEquals(1, (int) values.get(5));
+  }
+
+  // It should be able to build a graph by specifying vertices and edges
+  // as separate input formats.
+  @Test
+  public void testMixedVertexEdgeFormat() throws Exception {
+    String[] vertices = new String[] {
+        "1 75 2 3",
+        "2 34 1 5",
+        "3 13",
+        "4 32"
+    };
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1",
+        "5 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with either initial values or outgoing edges
+    // have been created
+    assertEquals(5, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(75, (int) values.get(1));
+    assertEquals(34, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+    // A vertex with edges but no initial value should have the default value
+    assertEquals(0, (int) values.get(5));
+
+    // Run a job with a custom VertexValueFactory
+    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
+    results = InternalVertexRunner.run(conf, vertices, edges);
+    values = parseResults(results);
+    // A vertex with edges but no initial value should have been constructed
+    // by the custom factory
+    assertEquals(3, (int) values.get(5));
+
+    conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that counts outgoing edges
+    results = InternalVertexRunner.run(conf, vertices, edges);
+
+    values = parseResults(results);
+
+    // Check the number of edges for each vertex
+    assertEquals(3, (int) values.get(1));
+    assertEquals(4, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(1, (int) values.get(4));
+    assertEquals(1, (int) values.get(5));
+  }
+
+  // It should use the specified input OutEdges class.
+  @Test
+  public void testDifferentInputEdgesClass() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(TestComputationCheckEdgesType.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges in the input have been
+    // created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex (edges with odd target id
+    // should have been removed)
+    assertEquals(1, (int) values.get(1));
+    assertEquals(1, (int) values.get(2));
+    assertEquals(0, (int) values.get(4));
+  }
+
+  public static class TestComputationCheckEdgesType extends
+      ComputationCountEdges {
+    @Override
+    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<NullWritable> messages) throws IOException {
+      assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
+      assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
+      super.compute(vertex, messages);
+    }
+  }
+
+  public static class TestVertexValueFactory
+      implements VertexValueFactory<IntWritable> {
+    @Override
+    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
+
+    @Override
+    public Class<IntWritable> getValueClass() {
+      return IntWritable.class;
+    }
+
+    @Override
+    public IntWritable newInstance() {
+      return new IntWritable(3);
+    }
+  }
+
+  public static class TestOutEdgesFilterEven
+      extends ByteArrayEdges<IntWritable, NullWritable> {
+    @Override
+    public void add(Edge<IntWritable, NullWritable> edge) {
+      if (edge.getTargetVertexId().get() % 2 == 0) {
+        super.add(edge);
+      }
+    }
+  }
+
+  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index b62775f..4a8caaa 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.AbstractComputation;
@@ -47,7 +47,7 @@ public class TestComputationCombinerTypes {
   public void testAllMatchWithCombiner() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntDoubleCombiner.class);
+            IntDoubleMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -88,7 +88,7 @@ public class TestComputationCombinerTypes {
   public void testDifferentCombinerIdType() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            DoubleDoubleCombiner.class);
+            DoubleDoubleMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -97,7 +97,7 @@ public class TestComputationCombinerTypes {
   public void testDifferentCombinerMessageType() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntLongCombiner.class);
+            IntLongMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -138,8 +138,8 @@ public class TestComputationCombinerTypes {
       NoOpComputation<IntWritable, IntWritable, LongWritable, LongWritable,
           IntWritable> { }
 
-  private static class NoOpCombiner<I extends WritableComparable,
-      M extends Writable> extends Combiner<I, M> {
+  private static class NoOpMessageCombiner<I extends WritableComparable,
+      M extends Writable> extends MessageCombiner<I, M> {
     @Override
     public void combine(I vertexIndex, M originalMessage, M messageToCombine) {
     }
@@ -150,12 +150,15 @@ public class TestComputationCombinerTypes {
     }
   }
 
-  private static class IntDoubleCombiner extends NoOpCombiner<IntWritable,
-      DoubleWritable> { }
+  private static class IntDoubleMessageCombiner
+      extends NoOpMessageCombiner<IntWritable,
+                  DoubleWritable> { }
 
-  private static class DoubleDoubleCombiner extends NoOpCombiner<DoubleWritable,
-      DoubleWritable> { }
+  private static class DoubleDoubleMessageCombiner
+      extends NoOpMessageCombiner<DoubleWritable,
+                  DoubleWritable> { }
 
-  private static class IntLongCombiner extends NoOpCombiner<IntWritable,
-      LongWritable> { }
+  private static class IntLongMessageCombiner
+      extends NoOpMessageCombiner<IntWritable,
+                  LongWritable> { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 6b0ed35..e96fd12 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.graph.AbstractComputation;
 import org.apache.giraph.graph.Vertex;
@@ -40,7 +40,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 
-/** Test switching Computation and Combiner class during application */
+/** Test switching Computation and MessageCombiner class during application */
 public class TestSwitchClasses {
   @Test
   public void testSwitchingClasses() throws Exception {
@@ -57,32 +57,41 @@ public class TestSwitchClasses {
     graph = InternalVertexRunner.run(conf, graph);
 
     Assert.assertEquals(2, graph.getVertices().size());
-    StatusValue value1 = graph.getVertex(id1).getValue();
-    StatusValue value2 = graph.getVertex(id2).getValue();
+  }
 
+  private static void checkVerticesOnFinalSuperstep(
+      Vertex<IntWritable, StatusValue, IntWritable> vertex) {
     // Check that computations were performed in expected order
-    ArrayList<Integer> expectedComputations = Lists.newArrayList(1, 1, 2, 3, 1);
-    checkComputations(expectedComputations, value1.computations);
-    checkComputations(expectedComputations, value2.computations);
-
+    final ArrayList<Integer> expectedComputations =
+        Lists.newArrayList(1, 1, 2, 3, 1);
+    checkComputations(expectedComputations, vertex.getValue().computations);
     // Check that messages were sent in the correct superstep,
     // and combined when needed
-    ArrayList<HashSet<Double>> messages1 =
-        Lists.newArrayList(
-            Sets.<Double>newHashSet(),
-            Sets.<Double>newHashSet(11d),
-            Sets.<Double>newHashSet(11d),
-            Sets.<Double>newHashSet(101.5, 201.5),
-            Sets.<Double>newHashSet(3002d));
-    checkMessages(messages1, value1.messagesReceived);
-    ArrayList<HashSet<Double>> messages2 =
-        Lists.newArrayList(
-            Sets.<Double>newHashSet(),
-            Sets.<Double>newHashSet(12d),
-            Sets.<Double>newHashSet(12d),
-            Sets.<Double>newHashSet(102.5, 202.5),
-            Sets.<Double>newHashSet(3004d));
-    checkMessages(messages2, value2.messagesReceived);
+    switch (vertex.getId().get()) {
+      case 1:
+        ArrayList<HashSet<Double>> messages1 =
+            Lists.newArrayList(
+                Sets.<Double>newHashSet(),
+                Sets.<Double>newHashSet(11d),
+                Sets.<Double>newHashSet(11d),
+                Sets.<Double>newHashSet(101.5, 201.5),
+                Sets.<Double>newHashSet(3002d));
+        checkMessages(messages1, vertex.getValue().messagesReceived);
+        break;
+      case 2:
+        ArrayList<HashSet<Double>> messages2 =
+            Lists.newArrayList(
+                Sets.<Double>newHashSet(),
+                Sets.<Double>newHashSet(12d),
+                Sets.<Double>newHashSet(12d),
+                Sets.<Double>newHashSet(102.5, 202.5),
+                Sets.<Double>newHashSet(3004d));
+        checkMessages(messages2, vertex.getValue().messagesReceived);
+        break;
+      default:
+        throw new IllegalStateException("checkVertices: Illegal vertex " +
+            vertex);
+    }
   }
 
   private static void checkComputations(ArrayList<Integer> expected,
@@ -113,7 +122,7 @@ public class TestSwitchClasses {
       switch ((int) getSuperstep()) {
         case 0:
           setComputation(Computation1.class);
-          setCombiner(MinimumCombiner.class);
+          setMessageCombiner(MinimumMessageCombiner.class);
           break;
         case 1:
           // test classes don't change
@@ -121,11 +130,11 @@ public class TestSwitchClasses {
         case 2:
           setComputation(Computation2.class);
           // test combiner removed
-          setCombiner(null);
+          setMessageCombiner(null);
           break;
         case 3:
           setComputation(Computation3.class);
-          setCombiner(SumCombiner.class);
+          setMessageCombiner(SumMessageCombiner.class);
           break;
         case 4:
           setComputation(Computation1.class);
@@ -147,6 +156,10 @@ public class TestSwitchClasses {
       IntWritable otherId = new IntWritable(3 - vertex.getId().get());
       sendMessage(otherId, new IntWritable(otherId.get() + 10));
       sendMessage(otherId, new IntWritable(otherId.get() + 20));
+      // Check the vertices on the final superstep
+      if (getSuperstep() == 4) {
+        checkVerticesOnFinalSuperstep(vertex);
+      }
     }
   }
 
@@ -179,8 +192,9 @@ public class TestSwitchClasses {
     }
   }
 
-  public static class MinimumCombiner extends Combiner<IntWritable,
-      IntWritable> {
+  public static class MinimumMessageCombiner
+      extends MessageCombiner<IntWritable,
+                  IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
         IntWritable messageToCombine) {
@@ -194,7 +208,8 @@ public class TestSwitchClasses {
     }
   }
 
-  public static class SumCombiner extends Combiner<IntWritable, IntWritable> {
+  public static class SumMessageCombiner
+      extends MessageCombiner<IntWritable, IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
         IntWritable messageToCombine) {
@@ -232,6 +247,12 @@ public class TestSwitchClasses {
     }
 
     @Override
+    public String toString() {
+      return "(computations=" + computations +
+          ",messagesReceived=" + messagesReceived + ")";
+    }
+
+    @Override
     public void write(DataOutput dataOutput) throws IOException {
       dataOutput.writeInt(computations.size());
       for (Integer computation : computations) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 0e68b56..0dd9b9c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -199,15 +199,15 @@ public class TestPartitionStores {
     partitionStore.addPartition(createPartition(conf, 4, v7));
 
     Partition<IntWritable, IntWritable, NullWritable> partition1 =
-        partitionStore.getPartition(1);
+        partitionStore.getOrCreatePartition(1);
     partitionStore.putPartition(partition1);
     Partition<IntWritable, IntWritable, NullWritable> partition2 =
-        partitionStore.getPartition(2);
+        partitionStore.getOrCreatePartition(2);
     partitionStore.putPartition(partition2);
     Partition<IntWritable, IntWritable, NullWritable> partition3 =
         partitionStore.removePartition(3);
     Partition<IntWritable, IntWritable, NullWritable> partition4 =
-        partitionStore.getPartition(4);
+        partitionStore.getOrCreatePartition(4);
     partitionStore.putPartition(partition4);
 
     assertEquals(3, partitionStore.getNumPartitions());
@@ -215,7 +215,7 @@ public class TestPartitionStores {
     int partitionsNumber = 0;
     for (Integer partitionId : partitionStore.getPartitionIds()) {
       Partition<IntWritable, IntWritable, NullWritable> p =
-          partitionStore.getPartition(partitionId);
+          partitionStore.getOrCreatePartition(partitionId);
       partitionStore.putPartition(p);
       partitionsNumber++;
     }
@@ -225,13 +225,13 @@ public class TestPartitionStores {
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));
     assertTrue(partitionStore.hasPartition(4));
-    partition = partitionStore.getPartition(1);
+    partition = partitionStore.getOrCreatePartition(1);
     assertEquals(3, partition.getVertexCount());
     partitionStore.putPartition(partition);
-    partition = partitionStore.getPartition(2);
+    partition = partitionStore.getOrCreatePartition(2);
     assertEquals(2, partition.getVertexCount());
     partitionStore.putPartition(partition);
-    partition = partitionStore.getPartition(4);
+    partition = partitionStore.getOrCreatePartition(4);
     assertEquals(1, partition.getVertexCount());
     assertEquals(2, partition.getEdgeCount());
     partitionStore.putPartition(partition);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 115da7e..5612e5f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -19,7 +19,7 @@
 package org.apache.giraph;
 
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.SimpleSumCombiner;
+import org.apache.giraph.combiner.SimpleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -290,7 +290,8 @@ public class
   }
 
   /**
-   * Run a sample BSP job locally with combiner and checkout output value.
+   * Run a sample BSP job locally with message combiner and
+   * checkout output value.
    *
    * @throws IOException
    * @throws ClassNotFoundException
@@ -302,7 +303,7 @@ public class
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(SimpleCombinerComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    conf.setCombinerClass(SimpleSumCombiner.class);
+    conf.setMessageCombinerClass(SimpleSumMessageCombiner.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     assertTrue(job.run(true));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
index 7d326da..8883d6f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
@@ -68,7 +68,7 @@ public class ConnectedComponentsComputationTest {
         GiraphConfiguration conf = new GiraphConfiguration();
         conf.setComputationClass(ConnectedComponentsComputation.class);
         conf.setOutEdgesClass(ByteArrayEdges.class);
-        conf.setCombinerClass(MinimumIntCombiner.class);
+        conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
         conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
         conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
index dbcd569..1bb8e94 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.graph.Vertex;
@@ -59,7 +59,7 @@ public class ConnectedComponentsComputationTestInMemory {
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(ConnectedComponentsComputation.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setCombinerClass(MinimumIntCombiner.class);
+    conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
 
     TestGraph<IntWritable, IntWritable, NullWritable> graph =
       new TestGraph<IntWritable, IntWritable, NullWritable>(conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
index 434c756..aa6cd8a 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
@@ -18,26 +18,27 @@
 
 package org.apache.giraph.examples;
 
-import static org.junit.Assert.assertEquals;
-
-import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class MinimumIntCombinerTest {
 
   @Test
   public void testCombiner() throws Exception {
-    Combiner<IntWritable, IntWritable> combiner =
-        new MinimumIntCombiner();
+    MessageCombiner<IntWritable, IntWritable>
+        messageCombiner =
+        new MinimumIntMessageCombiner();
 
     IntWritable vertexId = new IntWritable(1);
-    IntWritable result = combiner.createInitialMessage();
-    combiner.combine(vertexId, result, new IntWritable(39947466));
-    combiner.combine(vertexId, result, new IntWritable(199));
-    combiner.combine(vertexId, result, new IntWritable(42));
-    combiner.combine(vertexId, result, new IntWritable(19998888));
+    IntWritable result = messageCombiner.createInitialMessage();
+    messageCombiner.combine(vertexId, result, new IntWritable(39947466));
+    messageCombiner.combine(vertexId, result, new IntWritable(199));
+    messageCombiner.combine(vertexId, result, new IntWritable(42));
+    messageCombiner.combine(vertexId, result, new IntWritable(19998888));
     assertEquals(42, result.get());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
index 1323ff6..77bf3f3 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ByteArrayEdges;
@@ -72,7 +72,7 @@ public class TryMultiIpcBindingPortsTest {
         GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.set(conf, true);
         conf.setComputationClass(ConnectedComponentsComputation.class);
         conf.setOutEdgesClass(ByteArrayEdges.class);
-        conf.setCombinerClass(MinimumIntCombiner.class);
+        conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
         conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
         conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
index 4f74fcb..a481db3 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.vertex;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
@@ -63,8 +63,9 @@ public class TestComputationTypes {
     /**
      * Matches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMatchCombiner extends
-        Combiner<LongWritable, FloatWritable> {
+    private static class GeneratedVertexMatchMessageCombiner
+        extends
+        MessageCombiner<LongWritable, FloatWritable> {
       @Override
       public void combine(LongWritable vertexIndex,
           FloatWritable originalMessage,
@@ -80,8 +81,9 @@ public class TestComputationTypes {
     /**
      * Mismatches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMismatchCombiner extends
-        Combiner<LongWritable, DoubleWritable> {
+    private static class GeneratedVertexMismatchMessageCombiner
+        extends
+        MessageCombiner<LongWritable, DoubleWritable> {
       @Override
       public void combine(LongWritable vertexIndex,
           DoubleWritable originalMessage,
@@ -136,8 +138,8 @@ public class TestComputationTypes {
         GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
         GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
             SimpleSuperstepVertexInputFormat.class);
-        GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
-            GeneratedVertexMatchCombiner.class);
+        GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf,
+            GeneratedVertexMatchMessageCombiner.class);
       @SuppressWarnings("rawtypes")
       GiraphConfigurationValidator<?, ?, ?, ?, ?> validator =
         new GiraphConfigurationValidator(conf);
@@ -203,8 +205,8 @@ public class TestComputationTypes {
       GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
       GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
         SimpleSuperstepVertexInputFormat.class);
-      GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
-        GeneratedVertexMismatchCombiner.class);
+      GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf,
+        GeneratedVertexMismatchMessageCombiner.class);
       @SuppressWarnings("rawtypes")
       GiraphConfigurationValidator<?, ?, ?, ?, ?> validator =
         new GiraphConfigurationValidator(conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
new file mode 100644
index 0000000..cc61441
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.vertex.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
+ * and edges with no values.
+ */
+public class HiveIntIntNullVertex
+    extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> {
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST);
+  }
+
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges(
+      HiveReadableRecord record) {
+    return HiveParsing.parseIntNullEdges(record, 1);
+  }
+
+  @Override
+  public IntWritable getVertexId(HiveReadableRecord record) {
+    return HiveParsing.parseIntID(record, 0);
+  }
+
+  @Override
+  public IntWritable getVertexValue(HiveReadableRecord record) {
+    return new IntWritable();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
index 334f382..7ae8bc3 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
@@ -80,7 +80,7 @@ import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES;
 import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS;
 import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_COMBINER_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
@@ -266,8 +266,8 @@ public class HiveJythonUtils {
 
     JythonUtils.init(conf, jythonJob.getComputation_name());
 
-    if (jythonJob.getCombiner() != null) {
-      VERTEX_COMBINER_CLASS.set(conf, jythonJob.getCombiner());
+    if (jythonJob.getMessageCombiner() != null) {
+      MESSAGE_COMBINER_CLASS.set(conf, jythonJob.getMessageCombiner());
     }
 
     conf.setInt(MIN_WORKERS, jythonJob.getWorkers());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index c75652c..98ba014 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.hive.input;
 
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.testing.LocalHiveServer;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.hive.GiraphHiveTestBase;
 import org.apache.giraph.hive.Helpers;
@@ -24,16 +26,13 @@ import org.apache.giraph.hive.computations.ComputationCountEdges;
 import org.apache.giraph.hive.computations.ComputationSumEdges;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.input.vertex.examples.HiveIntDoubleDoubleVertex;
-import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntIntNullVertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.thrift.TException;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.facebook.hiveio.common.HiveMetastores;
-import com.facebook.hiveio.testing.LocalHiveServer;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -66,7 +65,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase {
 
     GiraphConfiguration conf = new GiraphConfiguration();
     HIVE_VERTEX_INPUT.setTable(conf, tableName);
-    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class);
     conf.setComputationClass(ComputationCountEdges.class);
     conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -99,7 +98,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase {
     GiraphConfiguration conf = new GiraphConfiguration();
     HIVE_VERTEX_INPUT.setTable(conf, tableName);
     HIVE_VERTEX_INPUT.setPartition(conf, partition);
-    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class);
     conf.setComputationClass(ComputationCountEdges.class);
     conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
index a87f030..f5cf685 100644
--- a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from org.apache.giraph.combiner import DoubleSumCombiner
+from org.apache.giraph.combiner import DoubleSumMessageCombiner
 from org.apache.giraph.edge import ByteArrayEdges
 from org.apache.giraph.jython import JythonJob
 from org.apache.hadoop.io import IntWritable

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/src/site/xdoc/quick_start.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/quick_start.xml b/src/site/xdoc/quick_start.xml
index 5df6555..4c14c75 100644
--- a/src/site/xdoc/quick_start.xml
+++ b/src/site/xdoc/quick_start.xml
@@ -224,7 +224,7 @@ usage: org.apache.giraph.utils.ConfigurationUtils [-aw &lt;arg&gt;] [-c &lt;arg&
        &lt;arg&gt;] [-vip &lt;arg&gt;] [-vvf &lt;arg&gt;] [-w &lt;arg&gt;] [-wc &lt;arg&gt;] [-yh &lt;arg&gt;]
        [-yj &lt;arg&gt;]
  -aw,--aggregatorWriter &lt;arg&gt;           AggregatorWriter class
- -c,--combiner &lt;arg&gt;                    Combiner class
+ -c,--messageCombiner &lt;arg&gt;             Message messageCombiner class
  -ca,--customArguments &lt;arg&gt;            provide custom arguments for the
                                         job configuration in the form: -ca
                                         &lt;param1&gt;=&lt;value1&gt;,&lt;param2&gt;=&lt;value2&gt;


[3/4] Everything compiles. All tests should run. Next step is to add a test for the vertex combiner. Should have fixed. Fixed one bug for byte array partition. Fixed another bug for too small of a message buffer. Rebased. Rebased. Passes tests. Need to

Posted by ac...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index f97446f..3337621 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -19,13 +19,15 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -92,11 +94,14 @@ public class GiraphClasses<I extends WritableComparable,
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
-  /** Combiner class - cached for fast access */
-  protected Class<? extends Combiner<I, ? extends Writable>> combinerClass;
+  /** Message combiner class - cached for fast access */
+  protected Class<? extends MessageCombiner<I, ? extends Writable>>
+  messageCombinerClass;
 
   /** Vertex resolver class - cached for fast access */
   protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
+  /** Vertex value combiner class - cached for fast access */
+  protected Class<? extends VertexValueCombiner<V>> vertexValueCombinerClass;
   /** Worker context class - cached for fast access */
   protected Class<? extends WorkerContext> workerContextClass;
   /** Master compute class - cached for fast access */
@@ -131,6 +136,8 @@ public class GiraphClasses<I extends WritableComparable,
     aggregatorWriterClass = TextAggregatorWriter.class;
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         (Object) DefaultVertexResolver.class;
+    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+        (Object) DefaultVertexValueCombiner.class;
     workerContextClass = DefaultWorkerContext.class;
     masterComputeClass = DefaultMasterCompute.class;
     partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
@@ -176,10 +183,13 @@ public class GiraphClasses<I extends WritableComparable,
         EDGE_OUTPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
-    combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
-        VERTEX_COMBINER_CLASS.get(conf);
+    messageCombinerClass =
+        (Class<? extends MessageCombiner<I, ? extends Writable>>)
+        MESSAGE_COMBINER_CLASS.get(conf);
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         VERTEX_RESOLVER_CLASS.get(conf);
+    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
+        VERTEX_VALUE_COMBINER_CLASS.get(conf);
     workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
     masterComputeClass =  MASTER_COMPUTE_CLASS.get(conf);
     partitionClass = (Class<? extends Partition<I, V, E>>)
@@ -390,21 +400,22 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Check if Combiner is set
+   * Check if MessageCombiner is set
    *
-   * @return true if Combiner is set
+   * @return true if MessageCombiner is set
    */
-  public boolean hasCombinerClass() {
-    return combinerClass != null;
+  public boolean hasMessageCombinerClass() {
+    return messageCombinerClass != null;
   }
 
   /**
-   * Get Combiner used
+   * Get MessageCombiner used
    *
-   * @return Combiner
+   * @return MessageCombiner
    */
-  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
-    return combinerClass;
+  public Class<? extends MessageCombiner<I, ? extends Writable>>
+  getMessageCombinerClass() {
+    return messageCombinerClass;
   }
 
   /**
@@ -426,6 +437,15 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Get VertexValueCombiner used
+   *
+   * @return VertexValueCombiner
+   */
+  public Class<? extends VertexValueCombiner<V>> getVertexValueCombinerClass() {
+    return vertexValueCombinerClass;
+  }
+
+  /**
    * Check if WorkerContext is set
    *
    * @return true if WorkerContext is set
@@ -639,14 +659,14 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Set Combiner class used
+   * Set MessageCombiner class used
    *
-   * @param combinerClass Combiner class to set
+   * @param combinerClass MessageCombiner class to set
    * @return this
    */
-  public GiraphClasses setCombinerClass(
-      Class<? extends Combiner<I, ? extends Writable>> combinerClass) {
-    this.combinerClass = combinerClass;
+  public GiraphClasses setMessageCombiner(
+      Class<? extends MessageCombiner<I, ? extends Writable>> combinerClass) {
+    this.messageCombinerClass = combinerClass;
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 15ff861..4dee396 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -19,13 +19,14 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
 import org.apache.giraph.factories.ComputationFactory;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
-import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -96,7 +97,7 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the user's subclassed {@link org.apache.giraph.graph.Computation}
+   * Get the user's subclassed {@link Computation}
    *
    * @return User's computation class
    */
@@ -467,22 +468,22 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the vertex combiner class (optional)
+   * Get the message combiner class (optional)
    *
-   * @return vertexCombinerClass Determines how vertex messages are combined
+   * @return messageCombinerClass Determines how vertex messages are combined
    */
-  public Class<? extends Combiner> getCombinerClass() {
-    return VERTEX_COMBINER_CLASS.get(this);
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    return MESSAGE_COMBINER_CLASS.get(this);
   }
 
   /**
-   * Set the vertex combiner class (optional)
+   * Set the message combiner class (optional)
    *
-   * @param vertexCombinerClass Determines how vertex messages are combined
+   * @param messageCombinerClass Determines how vertex messages are combined
    */
-  public void setCombinerClass(
-      Class<? extends Combiner> vertexCombinerClass) {
-    VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
   }
 
   /**
@@ -525,6 +526,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the vertex value combiner class (optional)
+   *
+   * @param vertexValueCombinerClass Determines how vertices are combined
+   */
+  public final void setVertexValueCombinerClass(
+      Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
+    VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
+  }
+
+  /**
    * Set the worker context class (optional)
    *
    * @param workerContextClass Determines what code is executed on a each

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4dadd29..89fce61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -19,7 +19,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
@@ -34,8 +34,10 @@ import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.factories.VertexIdFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -150,15 +152,20 @@ public interface GiraphConstants {
   ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
       ClassConfOption.create("giraph.worker.observers", null,
           WorkerObserver.class, "Classes for Worker Observer - optional");
-  /** Vertex combiner class - optional */
-  ClassConfOption<Combiner> VERTEX_COMBINER_CLASS =
-      ClassConfOption.create("giraph.combinerClass", null, Combiner.class,
-          "Vertex combiner class - optional");
+  /** Message combiner class - optional */
+  ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
+      ClassConfOption.create("giraph.messageCombinerClass", null,
+          MessageCombiner.class, "Message combiner class - optional");
   /** Vertex resolver class - optional */
   ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
       ClassConfOption.create("giraph.vertexResolverClass",
           DefaultVertexResolver.class, VertexResolver.class,
           "Vertex resolver class - optional");
+  /** Vertex value combiner class - optional */
+  ClassConfOption<VertexValueCombiner> VERTEX_VALUE_COMBINER_CLASS =
+      ClassConfOption.create("giraph.vertexValueCombinerClass",
+          DefaultVertexValueCombiner.class, VertexValueCombiner.class,
+          "Vertex value combiner class - optional");
 
   /** Which language computation is implemented in */
   EnumConfOption<Language> COMPUTATION_LANGUAGE =
@@ -588,6 +595,20 @@ public interface GiraphConstants {
           "request size is M, and a worker has P partitions, than its " +
           "initial partition buffer size will be (M / P) * (1 + A).");
 
+  /** Maximum size of vertices (in bytes) per peer before flush */
+  IntConfOption MAX_VERTEX_REQUEST_SIZE =
+      new IntConfOption("giraph.vertexRequestSize", 512 * ONE_KB,
+          "Maximum size of vertices (in bytes) per peer before flush");
+
+  /**
+   * Additional size (expressed as a ratio) of each per-partition buffer on
+   * top of the average size for vertices.
+   */
+  FloatConfOption ADDITIONAL_VERTEX_REQUEST_SIZE =
+      new FloatConfOption("giraph.additionalVertexRequestSize", 0.2f,
+          "Additional size (expressed as a ratio) of each per-partition " +
+              "buffer on top of the average size.");
+
   /** Maximum size of edges (in bytes) per peer before flush */
   IntConfOption MAX_EDGE_REQUEST_SIZE =
       new IntConfOption("giraph.edgeRequestSize", 512 * ONE_KB,
@@ -595,7 +616,7 @@ public interface GiraphConstants {
 
   /**
    * Additional size (expressed as a ratio) of each per-partition buffer on
-   * top of the average size.
+   * top of the average size for edges.
    */
   FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
       new FloatConfOption("giraph.additionalEdgeRequestSize", 0.2f,
@@ -665,9 +686,9 @@ public interface GiraphConstants {
   LongConfOption INPUT_SPLIT_MAX_VERTICES =
       new LongConfOption("giraph.InputSplitMaxVertices", -1,
           "To limit outlier vertex input splits from producing too many " +
-          "vertices or to help with testing, the number of vertices loaded " +
-          "from an input split can be limited. By default, everything is " +
-          "loaded.");
+              "vertices or to help with testing, the number of vertices " +
+              "loaded from an input split can be limited. By default, " +
+              "everything is loaded.");
 
   /**
    * To limit outlier vertex input splits from producing too many vertices or
@@ -677,9 +698,9 @@ public interface GiraphConstants {
   LongConfOption INPUT_SPLIT_MAX_EDGES =
       new LongConfOption("giraph.InputSplitMaxEdges", -1,
           "To limit outlier vertex input splits from producing too many " +
-          "vertices or to help with testing, the number of edges loaded " +
-          "from an input split can be limited. By default, everything is " +
-          "loaded.");
+              "vertices or to help with testing, the number of edges loaded " +
+              "from an input split can be limited. By default, everything is " +
+              "loaded.");
 
   /**
    * To minimize network usage when reading input splits,

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 435dfa5..6bb6c00 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.OutEdges;
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -87,8 +88,7 @@ import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
  */
 @SuppressWarnings("unchecked")
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends GiraphConfiguration {
+    V extends Writable, E extends Writable> extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
   /** Value (IVEMM) Factories */
@@ -429,12 +429,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Get the user's subclassed {@link Combiner} class.
+   * Get the user's subclassed
+   * {@link org.apache.giraph.combiner.MessageCombiner} class.
    *
    * @return User's combiner class
    */
-  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
-    return classes.getCombinerClass();
+  public Class<? extends MessageCombiner<I, ? extends Writable>>
+  getMessageCombinerClass() {
+    return classes.getMessageCombinerClass();
   }
 
   /**
@@ -444,8 +446,9 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @return Instantiated user combiner class
    */
   @SuppressWarnings("rawtypes")
-  public <M extends Writable> Combiner<I, M> createCombiner() {
-    Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
+  public <M extends Writable> MessageCombiner<I, M> createMessageCombiner() {
+    Class<? extends MessageCombiner<I, M>> klass =
+        classes.getMessageCombinerClass();
     return ReflectionUtils.newInstance(klass, this);
   }
 
@@ -454,8 +457,29 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return True iff user set a combiner class
    */
-  public boolean useCombiner() {
-    return classes.hasCombinerClass();
+  public boolean useMessageCombiner() {
+    return classes.hasMessageCombinerClass();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexValueCombiner} class.
+   *
+   * @return User's vertex value combiner class
+   */
+  public Class<? extends VertexValueCombiner<V>>
+  getVertexValueCombinerClass() {
+    return classes.getVertexValueCombinerClass();
+  }
+
+  /**
+   * Create a user vertex value combiner class
+   *
+   * @return Instantiated user vertex value combiner class
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexValueCombiner<V> createVertexValueCombiner() {
+    return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
   }
 
   /**
@@ -979,7 +1003,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Update Computation and Combiner class used
+   * Update Computation and MessageCombiner class used
    *
    * @param superstepClasses SuperstepClasses
    */
@@ -999,6 +1023,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
           (Class<? extends Writable>) classList[4];
       classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
     }
-    classes.setCombinerClass(superstepClasses.getCombinerClass());
+    classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 23df689..1694d36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -181,7 +181,7 @@ public class EdgeStore<I extends WritableComparable,
             Integer partitionId;
             while ((partitionId = partitionIdQueue.poll()) != null) {
               Partition<I, V, E> partition =
-                  service.getPartitionStore().getPartition(partitionId);
+                  service.getPartitionStore().getOrCreatePartition(partitionId);
               ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
                   transientEdges.remove(partitionId);
               for (I vertexId : partitionEdges.keySet()) {
@@ -196,7 +196,15 @@ public class EdgeStore<I extends WritableComparable,
                       outEdges);
                   partition.putVertex(vertex);
                 } else {
-                  vertex.setEdges(outEdges);
+                  // A vertex may exist with or without edges initially
+                  // and optimize the case of no initial edges
+                  if (vertex.getNumEdges() == 0) {
+                    vertex.setEdges(outEdges);
+                  } else {
+                    for (Edge<I, E> edge : outEdges) {
+                      vertex.addEdge(edge);
+                    }
+                  }
                   // Some Partition implementations (e.g. ByteArrayPartition)
                   // require us to put back the vertex after modifying it.
                   partition.saveVertex(vertex);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 77d9f5e..1fe1d10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -153,7 +153,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       }
 
       Partition<I, V, E> partition =
-          serviceWorker.getPartitionStore().getPartition(partitionId);
+          serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
 
       Computation<I, V, E, M1, M2> computation =
           (Computation<I, V, E, M1, M2>) configuration.createComputation();

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
new file mode 100644
index 0000000..4dc6384
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The default vertex value combining approach is to simply keep the original
+ * value.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class DefaultVertexValueCombiner<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements VertexValueCombiner<V> {
+  @Override
+  public void combine(V originalVertexValue,
+                      V vertexValue) {
+    // Keep the original value, do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
new file mode 100644
index 0000000..7891434
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueCombiner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * When vertex values with the same vertex id are loaded, this
+ * class specifies how to combine their vertex values.  Edges loaded will
+ * be added to the EdgeStore.
+ *
+ * @param <V> Vertex data
+ */
+public interface VertexValueCombiner<V extends Writable> {
+  /**
+   * Combine a vertex with the original vertex
+   * by modifying originalVertex.
+   *
+   * @param originalVertexValue Combine the other vertex into this one
+   * @param vertexValue Combine into the originalVertex.
+   */
+  void combine(V originalVertexValue, V vertexValue);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
new file mode 100644
index 0000000..1af5b73
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextVertexInputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex_id vertex_value neighbor1 neighbor2 ...
+ */
+public class IntIntNullTextVertexInputFormat
+    extends
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullVertexReader();
+  }
+
+  /**
+   * Vertex reader associated with
+   * {@link org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat}.
+   */
+  public class IntIntNullVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /** Cached vertex id for the current line */
+    private IntWritable id;
+    /** Cached vertex value for the current line */
+    private IntWritable value;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new IntWritable(Integer.parseInt(tokens[0]));
+      value = new IntWritable(Integer.parseInt(tokens[1]));
+      return tokens;
+    }
+
+    @Override
+    protected IntWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected IntWritable getValue(String[] tokens) throws IOException {
+      return value;
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+        String[] tokens) throws IOException {
+      List<Edge<IntWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 2);
+      for (int n = 2; n < tokens.length; n++) {
+        edges.add(EdgeFactory.create(
+            new IntWritable(Integer.parseInt(tokens[n]))));
+      }
+      return edges;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index 6a795a8..ac7f5b7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -85,16 +85,11 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      * Create the line record reader. Override this to use a different
      * underlying record reader (useful for testing).
      *
-     * @param inputSplit
-     *          the split to read
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record reader to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
+     * @param inputSplit the split to read
+     * @param context the context passed to initialize
+     * @return the record reader to be used
+     * @throws IOException exception that can be thrown during creation
+     * @throws InterruptedException exception that can be thrown during creation
      */
     protected RecordReader<LongWritable, Text>
     createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
@@ -157,22 +152,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
     /**
      * Reads vertex id from the current line.
      *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the current line
+     * @return the vertex id corresponding to the line
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract I getId(Text line) throws IOException;
 
     /**
      * Reads vertex value from the current line.
      *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex value corresponding to the line
+     * @param line the current line
+     * @return the vertex value corresponding to the line
      * @throws IOException
      *           exception that can be thrown while reading
      */
@@ -183,8 +173,7 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
    * Abstract class to be implemented by the user to read a vertex value from
    * each text line after preprocessing it.
    *
-   * @param <T>
-   *          The resulting type of preprocessing.
+   * @param <T> The resulting type of preprocessing.
    */
   protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
       extends TextVertexValueReader {
@@ -226,12 +215,9 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      * Preprocess the line so other methods can easily read necessary
      * information for creating vertex.
      *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the current line to be read
+     * @return the preprocessed object
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract T preprocessLine(Text line) throws IOException;
 
@@ -240,22 +226,17 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable,
      *
      * @param line
      *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @return the vertex id
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract I getId(T line) throws IOException;
 
     /**
      * Reads vertex value from the preprocessed line.
      *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex value
-     * @throws IOException
-     *           exception that can be thrown while reading
+     * @param line the object obtained by preprocessing the line
+     * @return the vertex value
+     * @throws IOException exception that can be thrown while reading
      */
     protected abstract V getValue(T line) throws IOException;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 5b870c5..fcb5b87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,13 +18,14 @@
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -73,6 +74,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
   /** V param vertex value factory index in classList */
   private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
+  /** V param vertex value combiner index in classList */
+  private static final int VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX = 0;
 
   /**
    * The Configuration object for use in the validation test.
@@ -138,7 +141,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     verifyEdgeInputFormatGenericTypes();
     verifyVertexOutputFormatGenericTypes();
     verifyVertexResolverGenericTypes();
-    verifyVertexCombinerGenericTypes();
+    verifyVertexValueCombinerGenericTypes();
+    verifyMessageCombinerGenericTypes();
     verifyVertexValueFactoryGenericTypes();
   }
 
@@ -240,17 +244,35 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     }
   }
 
-  /** If there is a combiner type, verify its generic params match the job. */
-  private void verifyVertexCombinerGenericTypes() {
-    Class<? extends Combiner<I, M2>> vertexCombinerClass =
-      conf.getCombinerClass();
-    if (vertexCombinerClass != null) {
+  /**
+   * If there is a vertex value combiner type, verify its
+   * generic params match the job.
+   */
+  private void verifyVertexValueCombinerGenericTypes() {
+    Class<? extends VertexValueCombiner<V>> vertexValueCombiner =
+        conf.getVertexValueCombinerClass();
+    if (vertexValueCombiner != null) {
+      Class<?>[] classList =
+          getTypeArguments(VertexValueCombiner.class, vertexValueCombiner);
+      checkAssignable(classList, VALUE_PARAM_VERTEX_VALUE_COMBINER_INDEX,
+          vertexValueType(), VertexValueCombiner.class, "vertex value");
+    }
+  }
+
+  /**
+   * If there is a message combiner type, verify its
+   * generic params match the job.
+   */
+  private void verifyMessageCombinerGenericTypes() {
+    Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
+      conf.getMessageCombinerClass();
+    if (messageCombinerClass != null) {
       Class<?>[] classList =
-          getTypeArguments(Combiner.class, vertexCombinerClass);
-      checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(), Combiner.class,
-          "vertex index");
+          getTypeArguments(MessageCombiner.class, messageCombinerClass);
+      checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+          MessageCombiner.class, "vertex index");
       checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
-          outgoingMessageValueType(), Combiner.class, "message value");
+          outgoingMessageValueType(), MessageCombiner.class, "message value");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
index 6b2eedf..c7e9eae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonJob.java
@@ -17,7 +17,7 @@
  */
 package org.apache.giraph.jython;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
@@ -281,8 +281,8 @@ public class JythonJob {
   private final TypeHolder message_value = new TypeHolder();
   /** Computation class */
   private String computation_name;
-  /** Combiner class */
-  private Class<? extends Combiner> combiner;
+  /** MessageCombiner class */
+  private Class<? extends MessageCombiner> messageCombiner;
   /** Java options */
   private final List<String> java_options = Lists.newArrayList();
   /** Giraph options */
@@ -342,12 +342,13 @@ public class JythonJob {
     return giraph_options;
   }
 
-  public Class<? extends Combiner> getCombiner() {
-    return combiner;
+  public Class<? extends MessageCombiner> getMessageCombiner() {
+    return messageCombiner;
   }
 
-  public void setCombiner(Class<? extends Combiner> combiner) {
-    this.combiner = combiner;
+  public void setMessageCombiner(
+      Class<? extends MessageCombiner> messageCombiner) {
+    this.messageCombiner = messageCombiner;
   }
 
   public String getComputation_name() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index cf7356c..287fdb9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
@@ -48,7 +48,10 @@ public abstract class MasterCompute
   private MasterAggregatorUsage masterAggregatorUsage;
   /** Graph state */
   private GraphState graphState;
-  /** Computation and Combiner class used, which can be switched by master */
+  /**
+   * Computation and MessageCombiner classes used, which can be
+   * switched by master
+   */
   private SuperstepClasses superstepClasses;
 
   /**
@@ -143,26 +146,27 @@ public abstract class MasterCompute
   }
 
   /**
-   * Set Combiner class to be used
+   * Set MessageCombiner class to be used
    *
-   * @param combinerClass Combiner class
+   * @param combinerClass MessageCombiner class
    */
-  public final void setCombiner(Class<? extends Combiner> combinerClass) {
-    superstepClasses.setCombinerClass(combinerClass);
+  public final void setMessageCombiner(
+      Class<? extends MessageCombiner> combinerClass) {
+    superstepClasses.setMessageCombinerClass(combinerClass);
   }
 
   /**
-   * Get Combiner class to be used
+   * Get MessageCombiner class to be used
    *
-   * @return Combiner class
+   * @return MessageCombiner class
    */
-  public final Class<? extends Combiner> getCombiner() {
+  public final Class<? extends MessageCombiner> getMessageCombiner() {
     // Might be called prior to classes being set, do not return NPE
     if (superstepClasses == null) {
       return null;
     }
 
-    return superstepClasses.getCombinerClass();
+    return superstepClasses.getMessageCombinerClass();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 7a7df05..8344910 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.graph.Computation;
@@ -35,13 +35,13 @@ import java.lang.reflect.Modifier;
 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 
 /**
- * Holds Computation and Combiner class.
+ * Holds Computation and MessageCombiner class.
  */
 public class SuperstepClasses implements Writable {
   /** Computation class to be used in the following superstep */
   private Class<? extends Computation> computationClass;
-  /** Combiner class to be used in the following superstep */
-  private Class<? extends Combiner> combinerClass;
+  /** MessageCombiner class to be used in the following superstep */
+  private Class<? extends MessageCombiner> messageCombinerClass;
 
   /**
    * Default constructor
@@ -56,27 +56,28 @@ public class SuperstepClasses implements Writable {
    */
   @SuppressWarnings("unchecked")
   public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
-    this(conf.getComputationClass(), conf.getCombinerClass());
+    this(conf.getComputationClass(), conf.getMessageCombinerClass());
   }
 
   /**
    * Constructor
    *
    * @param computationClass Computation class
-   * @param combinerClass Combiner class
+   * @param messageCombinerClass MessageCombiner class
    */
   public SuperstepClasses(Class<? extends Computation> computationClass,
-      Class<? extends Combiner> combinerClass) {
+      Class<? extends MessageCombiner> messageCombinerClass) {
     this.computationClass = computationClass;
-    this.combinerClass = combinerClass;
+    this.messageCombinerClass =
+        messageCombinerClass;
   }
 
   public Class<? extends Computation> getComputationClass() {
     return computationClass;
   }
 
-  public Class<? extends Combiner> getCombinerClass() {
-    return combinerClass;
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    return messageCombinerClass;
   }
 
   public void setComputationClass(
@@ -84,13 +85,15 @@ public class SuperstepClasses implements Writable {
     this.computationClass = computationClass;
   }
 
-  public void setCombinerClass(Class<? extends Combiner> combinerClass) {
-    this.combinerClass = combinerClass;
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    this.messageCombinerClass =
+        messageCombinerClass;
   }
 
   /**
-   * Verify that types of current Computation and Combiner are valid. If types
-   * don't match an {@link IllegalStateException} will be thrown.
+   * Verify that types of current Computation and MessageCombiner are valid.
+   * If types don't match an {@link IllegalStateException} will be thrown.
    *
    * @param conf Configuration to verify this with
    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
@@ -128,13 +131,13 @@ public class SuperstepClasses implements Writable {
       throw new IllegalStateException("verifyTypesMatch: " +
           "Message type can't be abstract class" + outgoingMessageType);
     }
-    if (combinerClass != null) {
+    if (messageCombinerClass != null) {
       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
-          Combiner.class, combinerClass);
+          MessageCombiner.class, messageCombinerClass);
       verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
-          "Vertex id", combinerClass);
+          "Vertex id", messageCombinerClass);
       verifyTypes(outgoingMessageType, combinerTypes[1],
-          "Outgoing message", combinerClass);
+          "Outgoing message", messageCombinerClass);
     }
   }
 
@@ -160,13 +163,13 @@ public class SuperstepClasses implements Writable {
   @Override
   public void write(DataOutput output) throws IOException {
     WritableUtils.writeClass(computationClass, output);
-    WritableUtils.writeClass(combinerClass, output);
+    WritableUtils.writeClass(messageCombinerClass, output);
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     computationClass = WritableUtils.readClass(input);
-    combinerClass = WritableUtils.readClass(input);
+    messageCombinerClass = WritableUtils.readClass(input);
   }
 
   @Override
@@ -174,6 +177,7 @@ public class SuperstepClasses implements Writable {
     String computationName = computationClass == null ? "_not_set_" :
         computationClass.getName();
     return "(computation=" + computationName + ",combiner=" +
-        ((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
+        ((messageCombinerClass == null) ? "null" :
+            messageCombinerClass.getName()) + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
index f2b8552..ec8a7d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -44,11 +46,14 @@ public abstract class BasicPartition<I extends WritableComparable,
   private int id;
   /** Context used to report progress */
   private Progressable progressable;
+  /** Vertex value combiner */
+  private VertexValueCombiner<V> vertexValueCombiner;
 
   @Override
   public void initialize(int partitionId, Progressable progressable) {
     setId(partitionId);
     setProgressable(progressable);
+    vertexValueCombiner = conf.createVertexValueCombiner();
   }
 
   @Override
@@ -84,6 +89,21 @@ public abstract class BasicPartition<I extends WritableComparable,
     this.progressable = progressable;
   }
 
+  public VertexValueCombiner<V> getVertexValueCombiner() {
+    return vertexValueCombiner;
+  }
+
+  @Override
+  public void addPartitionVertices(VertexIterator<I, V, E> vertexIterator) {
+    while (vertexIterator.hasNext()) {
+      vertexIterator.next();
+      // Release the vertex if it was put, otherwise reuse as an optimization
+      if (putOrCombine(vertexIterator.getVertex())) {
+        vertexIterator.releaseVertex();
+      }
+    }
+  }
+
   @Override
   public void write(DataOutput output) throws IOException {
     output.writeInt(id);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 6eaa6d7..cef39cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.partition;
 
+import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Ints;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.WritableUtils;
@@ -24,9 +27,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-import com.google.common.collect.MapMaker;
-import com.google.common.primitives.Ints;
-
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +44,7 @@ import java.util.concurrent.ConcurrentMap;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@NotThreadSafe
 public class ByteArrayPartition<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends BasicPartition<I, V, E>
@@ -55,6 +57,8 @@ public class ByteArrayPartition<I extends WritableComparable,
   private ConcurrentMap<I, byte[]> vertexMap;
   /** Representative vertex */
   private Vertex<I, V, E> representativeVertex;
+  /** Representative combiner vertex */
+  private Vertex<I, V, E> representativeCombinerVertex;
   /** Use unsafe serialization */
   private boolean useUnsafeSerialization;
 
@@ -73,6 +77,11 @@ public class ByteArrayPartition<I extends WritableComparable,
         getConf().createVertexId(),
         getConf().createVertexValue(),
         getConf().createOutEdges());
+    representativeCombinerVertex = getConf().createVertex();
+    representativeCombinerVertex.initialize(
+        getConf().createVertexId(),
+        getConf().createVertexValue(),
+        getConf().createOutEdges());
     useUnsafeSerialization = getConf().useUnsafeSerialization();
   }
 
@@ -125,8 +134,57 @@ public class ByteArrayPartition<I extends WritableComparable,
         (ByteArrayPartition<I, V, E>) partition;
     for (Map.Entry<I, byte[]> entry :
         byteArrayPartition.vertexMap.entrySet()) {
-      vertexMap.put(entry.getKey(), entry.getValue());
+
+      byte[] oldVertexBytes =
+          vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
+      if (oldVertexBytes == null) {
+        continue;
+      }
+
+      // Note that vertex combining is going to be expensive compared to
+      // SimplePartition since here we have to deserialize the vertices,
+      // combine them, and then reserialize them.  If the vertex doesn't exist,
+      // just add the new vertex as a byte[]
+      synchronized (this) {
+        // Combine the vertex values
+        WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+            representativeVertex, useUnsafeSerialization, getConf());
+        WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
+            representativeCombinerVertex, useUnsafeSerialization, getConf());
+        getVertexValueCombiner().combine(representativeVertex.getValue(),
+            representativeCombinerVertex.getValue());
+
+        // Add the edges to the representative vertex
+        for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
+          representativeVertex.addEdge(edge);
+        }
+
+        byte[] vertexData = WritableUtils.writeVertexToByteArray(
+            representativeCombinerVertex, useUnsafeSerialization, getConf());
+        vertexMap.put(entry.getKey(), vertexData);
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
+    // Optimistically try to first put and then combine if this fails
+    byte[] vertexData =
+        WritableUtils.writeVertexToByteArray(
+            vertex, useUnsafeSerialization, getConf());
+    byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
+    if (oldVertexBytes == null) {
+      return true;
     }
+
+    WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+        representativeVertex, useUnsafeSerialization, getConf());
+    getVertexValueCombiner().combine(representativeVertex.getValue(),
+        vertex.getValue());
+    vertexMap.put(vertex.getId(),
+        WritableUtils.writeVertexToByteArray(
+            representativeVertex, useUnsafeSerialization, getConf()));
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 110ce9d..c37efd5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -222,15 +222,28 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public Partition<I, V, E> getPartition(Integer id) {
+  public Partition<I, V, E> getOrCreatePartition(Integer id) {
     try {
-      return pool.submit(new GetPartition(id)).get();
+      wLock.lock();
+      Partition<I, V, E> partition =
+          pool.submit(new GetPartition(id)).get();
+      if (partition == null) {
+        Partition<I, V, E> newPartition =
+            conf.createPartition(id, context);
+        pool.submit(
+            new AddPartition(id, newPartition)).get();
+        return newPartition;
+      } else {
+        return partition;
+      }
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-          "getPartition: cannot retrieve partition " + id, e);
+          "getOrCreatePartition: cannot retrieve partition " + id, e);
     } catch (ExecutionException e) {
       throw new IllegalStateException(
-          "getPartition: cannot retrieve partition " + id, e);
+          "getOrCreatePartition: cannot retrieve partition " + id, e);
+    } finally {
+      wLock.unlock();
     }
   }
 
@@ -263,7 +276,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
 
   @Override
   public Partition<I, V, E> removePartition(Integer id) {
-    Partition<I, V, E> partition = getPartition(id);
+    Partition<I, V, E> partition = getOrCreatePartition(id);
     // we put it back, so the partition can turn INACTIVE and be deleted.
     putPartition(partition);
     deletePartition(id);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index b6b9551..479abcc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -20,6 +20,7 @@ package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -70,13 +71,32 @@ public interface Partition<I extends WritableComparable,
   Vertex<I, V, E> removeVertex(I vertexIndex);
 
   /**
-   * Add a partition's vertices
+   * Add a partition's vertices.  If a vertex to be added doesn't exist,
+   * add it.  If the vertex already exists, use the
+   * VertexValueCombiner to combine them.
    *
    * @param partition Partition to add
    */
   void addPartition(Partition<I, V, E> partition);
 
   /**
+   * Put this vertex or combine it
+   *
+   * @param vertex Vertex to put or combine
+   * @return True if the vertex was put (hint to release object)
+   */
+  boolean putOrCombine(Vertex<I, V, E> vertex);
+
+  /**
+   * Add vertices to a partition.  If a vertex to be added doesn't exist,
+   * add it.  If the vertex already exists, use the
+   * VertexValueCombiner to combine them.
+   *
+   * @param vertexIterator Vertices to add
+   */
+  void addPartitionVertices(VertexIterator<I, V, E> vertexIterator);
+
+  /**
    * Get the number of vertices in this partition
    *
    * @return Number of vertices

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 763397e..fdc20a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public abstract class PartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable> {
-
   /**
    * Add a new partition to the store or just the vertices from the partition
    * to the old partition.
@@ -40,17 +39,18 @@ public abstract class PartitionStore<I extends WritableComparable,
   public abstract void addPartition(Partition<I, V, E> partition);
 
   /**
-   * Get a partition. Note: user has to put back it to the store through
-   * {@link #putPartition(Partition)} after use.
+   * Get or create a partition. Note: user has to put back
+   * it to the store through {@link #putPartition(Partition)} after use.
    *
    * @param partitionId Partition id
-   * @return The requested partition
+   * @return The requested partition (never null)
    */
-  public abstract Partition<I, V, E> getPartition(Integer partitionId);
+  public abstract Partition<I, V, E> getOrCreatePartition(Integer partitionId);
 
   /**
    * Put a partition back to the store. Use this method to be put a partition
-   * back after it has been retrieved through {@link #getPartition(Integer)}.
+   * back after it has been retrieved through
+   * {@link #getOrCreatePartition(Integer)}.
    *
    * @param partition Partition
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 0c1b404..1609846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,14 +18,15 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Maps;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-import com.google.common.collect.Maps;
-
+import javax.annotation.concurrent.ThreadSafe;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
  * @param <V> Vertex data
  * @param <E> Edge data
  */
+@ThreadSafe
 @SuppressWarnings("rawtypes")
 public class SimplePartition<I extends WritableComparable,
     V extends Writable, E extends Writable>
@@ -81,9 +83,34 @@ public class SimplePartition<I extends WritableComparable,
   }
 
   @Override
+  public boolean putOrCombine(Vertex<I, V, E> vertex) {
+    Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
+    if (originalVertex == null) {
+      originalVertex =
+          vertexMap.putIfAbsent(vertex.getId(), vertex);
+      if (originalVertex == null) {
+        return true;
+      }
+    }
+
+    synchronized (originalVertex) {
+      // Combine the vertex values
+      getVertexValueCombiner().combine(
+          originalVertex.getValue(), vertex.getValue());
+
+      // Add the edges to the representative vertex
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        originalVertex.addEdge(edge);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
   public void addPartition(Partition<I, V, E> partition) {
     for (Vertex<I, V, E> vertex : partition) {
-      vertexMap.put(vertex.getId(), vertex);
+      putOrCombine(vertex);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index ae17aac..79c18c3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -18,13 +18,12 @@
 
 package org.apache.giraph.partition;
 
+import com.google.common.collect.Maps;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import com.google.common.collect.Maps;
-
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -67,12 +66,22 @@ public class SimplePartitionStore<I extends WritableComparable,
         return;
       }
     }
+    // This is thread-safe
     oldPartition.addPartition(partition);
   }
 
   @Override
-  public Partition<I, V, E> getPartition(Integer partitionId) {
-    return partitions.get(partitionId);
+  public Partition<I, V, E> getOrCreatePartition(Integer partitionId) {
+    Partition<I, V, E> oldPartition = partitions.get(partitionId);
+    if (oldPartition == null) {
+      Partition<I, V, E> newPartition =
+          conf.createPartition(partitionId, context);
+      oldPartition = partitions.putIfAbsent(partitionId, newPartition);
+      if (oldPartition == null) {
+        return newPartition;
+      }
+    }
+    return oldPartition;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 7e2b73b..4958ae3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -55,7 +55,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
    * deserializd right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
-    if (!getConf().useCombiner()) {
+    if (!getConf().useMessageCombiner()) {
       useMessageSizeEncoding = getConf().useMessageSizeEncoding();
     } else {
       useMessageSizeEncoding = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 4bc4f4d..e441f03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.utils;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -25,7 +27,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
@@ -35,6 +37,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -54,9 +57,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -108,7 +108,7 @@ public final class ConfigurationUtils {
         "for the vertex output");
     OPTIONS.addOption("esd",  "edgeSubDir", true, "subdirectory to be used " +
         "for the edge output");
-    OPTIONS.addOption("c", "combiner", true, "Combiner class");
+    OPTIONS.addOption("c", "combiner", true, "MessageCombiner class");
     OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
     OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
@@ -277,8 +277,14 @@ public final class ConfigurationUtils {
       TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
     }
     if (cmd.hasOption("c")) {
-      conf.setCombinerClass(
-          (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
+      conf.setMessageCombinerClass(
+          (Class<? extends MessageCombiner>)
+              Class.forName(cmd.getOptionValue("c")));
+    }
+    if (cmd.hasOption("vc")) {
+      conf.setVertexValueCombinerClass(
+          (Class<? extends VertexValueCombiner>)
+              Class.forName(cmd.getOptionValue("vc")));
     }
     if (cmd.hasOption("ve")) {
       conf.setOutEdgesClass(

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
new file mode 100644
index 0000000..dced9bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Iterates over vertices stored in an ExtendedDataOutput such that
+ * the ownership of the vertex id can be transferred to another object.
+ * This optimization cuts down on the number of objects instantiated and
+ * garbage collected
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class VertexIterator<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  /** Reader of the serialized edges */
+  private final ExtendedDataInput extendedDataInput;
+  /** Current vertex */
+  private Vertex<I, V, E> vertex;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+
+  /**
+   * Constructor.
+   *
+   * @param extendedDataOutput Extended data output
+   * @param configuration Configuration
+   */
+  public VertexIterator(
+      ExtendedDataOutput extendedDataOutput,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
+    extendedDataInput = configuration.createExtendedDataInput(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    this.configuration = configuration;
+    resetEmptyVertex();
+  }
+
+  /**
+   * Reset the empty Vertex to an initial state.
+   */
+  private void resetEmptyVertex() {
+    vertex = configuration.createVertex();
+    I id = configuration.createVertexId();
+    V value = configuration.createVertexValue();
+    OutEdges<I, E> edges = configuration.createOutEdges();
+    vertex.initialize(id, value, edges);
+  }
+
+  /**
+   * Returns true if the iteration has more elements.
+   *
+   * @return True if the iteration has more elements.
+   */
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  /**
+   * Moves to the next element in the iteration.
+   */
+  public void next() {
+    // If the vertex was released, create another one
+    if (vertex == null) {
+      resetEmptyVertex();
+    }
+
+    // If the vertex id was released, create another one
+    if (vertex.getId() == null) {
+      vertex.initialize(configuration.createVertexId(), vertex.getValue());
+    }
+
+    try {
+      WritableUtils.reinitializeVertexFromDataInput(
+          extendedDataInput, vertex, configuration);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+
+  /**
+   * Get the current vertex id.  Ihis object's contents are only guaranteed
+   * until next() is called.  To take ownership of this object call
+   * releaseCurrentVertexId() after getting a reference to this object.
+   *
+   * @return Current vertex id
+   */
+  public I getCurrentVertexId() {
+    return vertex.getId();
+  }
+
+  /**
+   * The backing store of the current vertex id is now released.
+   * Further calls to getCurrentVertexId () without calling next()
+   * will return null.
+   *
+   * @return Current vertex id that was released
+   */
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertex.getId();
+    vertex.initialize(null, vertex.getValue());
+    return releasedVertexId;
+  }
+
+  public Vertex<I, V, E> getVertex() {
+    return vertex;
+  }
+
+  /**
+   * Release the ownership of the Vertex object to the caller
+   *
+   * @return Released Vertex object
+   */
+  public Vertex<I, V, E> releaseVertex() {
+    Vertex<I, V, E> releasedVertex = vertex;
+    vertex = null;
+    return releasedVertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 9163c08..3f8382e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -530,7 +530,7 @@ public class WritableUtils {
   }
 
   /**
-   * Reads data from input stream to inizialize Vertex.
+   * Reads data from input stream to initialize Vertex.
    *
    * @param input The input stream
    * @param conf Configuration

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 112b76d..a92ddf8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -571,7 +571,7 @@ public class BspServiceWorker<I extends WritableComparable,
         new ArrayList<PartitionStats>();
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
+          getPartitionStore().getOrCreatePartition(partitionId);
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
               partition.getVertexCount(),
@@ -974,7 +974,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
 
               Partition<I, V, E> partition =
-                  getPartitionStore().getPartition(partitionId);
+                  getPartitionStore().getOrCreatePartition(partitionId);
               long verticesWritten = 0;
               for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
@@ -1082,7 +1082,7 @@ public class BspServiceWorker<I extends WritableComparable,
               }
 
               Partition<I, V, E> partition =
-                  getPartitionStore().getPartition(partitionId);
+                  getPartitionStore().getOrCreatePartition(partitionId);
               long vertices = 0;
               long edges = 0;
               long partitionEdgeCount = partition.getEdgeCount();
@@ -1241,7 +1241,7 @@ public class BspServiceWorker<I extends WritableComparable,
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       Partition<I, V, E> partition =
-          getPartitionStore().getPartition(partitionId);
+          getPartitionStore().getOrCreatePartition(partitionId);
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
       // write messages

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 115c108..fcdfa5c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -129,7 +129,7 @@ public class RequestTest {
     assertTrue(partitionStore.hasPartition(partitionId));
     int total = 0;
     Partition<IntWritable, IntWritable, IntWritable> partition2 =
-        partitionStore.getPartition(partitionId);	
+        partitionStore.getOrCreatePartition(partitionId);
     for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
       total += vertex.getId().get();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index a8f6f70..1fe3a25 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.FloatSumCombiner;
+import org.apache.giraph.combiner.FloatSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestIntFloatPrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
   }
 
   private static class IntFloatNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestIntFloatPrimitiveMessageStores {
   @Test
   public void testIntFloatMessageStore() throws IOException {
     IntFloatMessageStore messageStore =
-        new IntFloatMessageStore(service, new FloatSumCombiner());
+        new IntFloatMessageStore(service, new FloatSumMessageCombiner());
     insertIntFloatMessages(messageStore);
 
     Iterable<FloatWritable> m0 =

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index 0659260..a04b703 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.DoubleSumCombiner;
+import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -70,8 +70,8 @@ public class TestLongDoublePrimitiveMessageStores {
         Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
-    Mockito.when(partitionStore.getPartition(0)).thenReturn(partition);
-    Mockito.when(partitionStore.getPartition(1)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(0)).thenReturn(partition);
+    Mockito.when(partitionStore.getOrCreatePartition(1)).thenReturn(partition);
   }
 
   private static class LongDoubleNoOpComputation extends
@@ -122,7 +122,7 @@ public class TestLongDoublePrimitiveMessageStores {
   @Test
   public void testLongDoubleMessageStore() throws IOException {
     LongDoubleMessageStore messageStore =
-        new LongDoubleMessageStore(service, new DoubleSumCombiner());
+        new LongDoubleMessageStore(service, new DoubleSumMessageCombiner());
     insertLongDoubleMessages(messageStore);
 
     Iterable<DoubleWritable> m0 =