You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/08/31 17:23:38 UTC

[1/3] git commit: updated refs/heads/trunk to 2117d1d

Repository: giraph
Updated Branches:
  refs/heads/trunk 2ae95bd6e -> 2117d1dbb


http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java b/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java
new file mode 100644
index 0000000..53c4f9b
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types;
+
+import io.netty.util.internal.ThreadLocalRandom;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicCollectionsUtils;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test BasicSets and Basic2ObjectMaps
+ */
+public class TestBasicCollections {
+  private void testLongWritable2Object(Map<Long, String> input) {
+    Basic2ObjectMap<LongWritable, String> map = BasicCollectionsUtils.create2ObjectMap(LongWritable.class);
+
+    LongWritable longW = new LongWritable();
+    // adding
+    long keySum = 0;
+    for (Long key : input.keySet()) {
+      longW.set(key.longValue());
+      Assert.assertNull(map.put(longW, input.get(key)));
+      keySum += key.longValue();
+    }
+    Assert.assertEquals(input.size(), map.size());
+    // iterator
+    long sum = 0;
+    Iterator<LongWritable> iterator = map.fastKeyIterator();
+    while (iterator.hasNext()) {
+      sum += iterator.next().get();
+    }
+    Assert.assertEquals(keySum, sum);
+    // removal
+    for (Long key : input.keySet()) {
+      longW.set(key.longValue());
+      Assert.assertEquals(input.get(key), map.get(longW));
+      map.remove(longW);
+    }
+    Assert.assertEquals(0, map.size());
+  }
+
+  private void testFloatWritable2Object(Map<Float, String> input) {
+    Basic2ObjectMap<FloatWritable, String> map = BasicCollectionsUtils.create2ObjectMap(FloatWritable.class);
+
+    FloatWritable floatW = new FloatWritable();
+    // adding
+    float keySum = 0;
+    for (Float key : input.keySet()) {
+      floatW.set(key.longValue());
+      Assert.assertNull(map.put(floatW, input.get(key)));
+      keySum += key.longValue();
+    }
+    Assert.assertEquals(input.size(), map.size());
+    // iterator
+    float sum = 0;
+    Iterator<FloatWritable> iterator = map.fastKeyIterator();
+    while (iterator.hasNext()) {
+      sum += iterator.next().get();
+    }
+    Assert.assertEquals(keySum, sum, 1e-6);
+    // removal
+    for (Float key : input.keySet()) {
+      floatW.set(key.longValue());
+      Assert.assertEquals(input.get(key), map.get(floatW));
+      map.remove(floatW);
+    }
+    Assert.assertEquals(0, map.size());
+  }
+
+  @Test
+  public void testLongWritable2Object() {
+    Map<Long, String> input = new HashMap<>();
+    input.put(-1l, "a");
+    input.put(0l, "b");
+    input.put(100l, "c");
+    input.put(26256l, "d");
+    input.put(-1367367l, "a");
+    input.put(-35635l, "e");
+    input.put(1234567l, "f");
+    testLongWritable2Object(input);
+  }
+
+  @Test
+  public void testFloatWritable2Object() {
+    Map<Float, String> input = new HashMap<>();
+    input.put(-1f, "a");
+    input.put(0f, "b");
+    input.put(1.23f, "c");
+    input.put(-12.34f, "d");
+    input.put(-1367367.45f, "a");
+    input.put(-3.456f, "e");
+    input.put(12.78f, "f");
+    testFloatWritable2Object(input);
+  }
+
+  private <K, V> V getConcurrently(Basic2ObjectMap<K, V> map, K key, V defaultValue) {
+    synchronized (map) {
+      V value = map.get(key);
+
+      if (value == null) {
+        value = defaultValue;
+        map.put(key, value);
+      }
+      return value;
+    }
+  }
+
+  private <K, V> void removeConcurrently(Basic2ObjectMap<K, V> map, K key) {
+    synchronized (map) {
+      map.remove(key);
+    }
+  }
+
+  @Test
+  public void testLongWritable2ObjectConcurrent() throws InterruptedException {
+    final int numThreads = 10;
+    final int numValues = 100000;
+
+    final Map<Integer, Double> map = new ConcurrentHashMap<>();
+    for (int i = 0; i < numValues; i++) {
+      double value = ThreadLocalRandom.current().nextDouble();
+      map.put(i, value);
+    }
+
+    final int PARTS = 8;
+    final Basic2ObjectMap<IntWritable, Double>[] basicMaps = new Basic2ObjectMap[PARTS];
+    for (int i = 0; i < PARTS; i++) {
+      basicMaps[i] = BasicCollectionsUtils.create2ObjectMap(IntWritable.class);
+    }
+
+    long startTime = System.currentTimeMillis();
+
+    // adding in several threads
+    Thread[] threads = new Thread[numThreads];
+    for (int t = 0; t < threads.length; t++) {
+      threads[t] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          IntWritable intW = new IntWritable();
+          for (int i = 0; i < numValues; i++) {
+            intW.set(i);
+            double value = getConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW, map.get(i));
+            Assert.assertEquals(map.get(i).doubleValue(), value, 1e-6);
+          }
+        }
+      });
+      threads[t].start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+    int totalSize = 0;
+    for (int i = 0; i < PARTS; i++) {
+      totalSize += basicMaps[i].size();
+    }
+    Assert.assertEquals(numValues, totalSize);
+
+    long endTime = System.currentTimeMillis();
+    System.out.println("Add Time: " + (endTime - startTime) / 1000.0);
+
+    // removing all objects
+    for (int t = 0; t < threads.length; t++) {
+      threads[t] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          IntWritable intW = new IntWritable();
+          for (int i = 0; i < numValues; i++) {
+            intW.set(i);
+            removeConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW);
+          }
+        }
+      });
+      threads[t].start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+    for (int i = 0; i < PARTS; i++) {
+      Assert.assertEquals(0, basicMaps[i].size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 63403ab..b56998f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -166,7 +166,7 @@ public class MockUtils {
       numOfPartitions) {
     CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service =
         Mockito.mock(CentralizedServiceWorker.class);
-    Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
+    Answer<PartitionOwner> answerOwner = new Answer<PartitionOwner>() {
       @Override
       public PartitionOwner answer(InvocationOnMock invocation) throws
           Throwable {
@@ -175,7 +175,18 @@ public class MockUtils {
       }
     };
     Mockito.when(service.getVertexPartitionOwner(
-        Mockito.any(IntWritable.class))).thenAnswer(answer);
+      Mockito.any(IntWritable.class))).thenAnswer(answerOwner);
+
+    Answer<Integer> answerId = new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) throws
+          Throwable {
+        IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
+        return vertexId.get() % numOfPartitions;
+      }
+    };
+    Mockito.when(service.getPartitionId(
+      Mockito.any(IntWritable.class))).thenAnswer(answerId);
     return service;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
index 833c43e..28d5f5c 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
@@ -54,7 +54,7 @@ public class SccComputationTestInMemory {
   /**
    * Connects the {@outgoingVertices} to the given vertex id
    * with null-valued edges.
-   * 
+   *
    * @param graph
    * @param id
    * @param outgoingVertices
@@ -110,9 +110,7 @@ public class SccComputationTestInMemory {
   private Map<Long, List<Long>> parse(
       TestGraph<LongWritable, SccVertexValue, NullWritable> g) {
     Map<Long, List<Long>> scc = new HashMap<Long, List<Long>>();
-    for (LongWritable v : g.getVertices().keySet()) {
-      Vertex<LongWritable, SccVertexValue, NullWritable> vertex = g
-          .getVertex(v);
+    for (Vertex<LongWritable, SccVertexValue, NullWritable> vertex : g) {
       long sccId = vertex.getValue().get();
       List<Long> verticesIds = scc.get(sccId);
       if (verticesIds == null) {// New SCC


[2/3] git commit: updated refs/heads/trunk to 2117d1d

Posted by ik...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 715bf45..77b2689 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.messages.primitives;
 
+import com.google.common.collect.Lists;
+
 import it.unimi.dsi.fastutil.ints.Int2FloatMap;
 import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -30,17 +32,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
 
 /**
  * Special message store to be used when ids are IntWritable and messages
@@ -55,26 +54,26 @@ public class IntFloatMessageStore
   /** Message messageCombiner */
   private final
   MessageCombiner<? super IntWritable, FloatWritable> messageCombiner;
-  /** Service worker */
-  private final CentralizedServiceWorker<IntWritable, ?, ?> service;
+  /** Partition split info */
+  private final PartitionSplitInfo<IntWritable> partitionInfo;
 
   /**
    * Constructor
    *
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner Message messageCombiner
    */
   public IntFloatMessageStore(
-      CentralizedServiceWorker<IntWritable, Writable, Writable> service,
-      MessageCombiner<? super IntWritable, FloatWritable> messageCombiner) {
-    this.service = service;
+    PartitionSplitInfo<IntWritable> partitionInfo,
+    MessageCombiner<? super IntWritable, FloatWritable> messageCombiner
+  ) {
+    this.partitionInfo = partitionInfo;
     this.messageCombiner = messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
       Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
     }
   }
@@ -91,7 +90,7 @@ public class IntFloatMessageStore
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   @Override
@@ -117,12 +116,27 @@ public class IntFloatMessageStore
               reusableMessage);
           message = reusableCurrentMessage.get();
         }
+        // FIXME: messageCombiner should create an initial message instead
         partitionMap.put(vertexId, message);
       }
     }
   }
 
   @Override
+  public void addMessage(
+    IntWritable vertexId,
+    FloatWritable message
+  ) throws IOException {
+    Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      float originalValue = partitionMap.get(vertexId.get());
+      FloatWritable originalMessage = new FloatWritable(originalValue);
+      messageCombiner.combine(vertexId, originalMessage, message);
+      partitionMap.put(vertexId.get(), originalMessage.get());
+    }
+  }
+
+  @Override
   public void finalizeStore() {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 4fc4843..19aede4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -30,15 +30,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.collect.Lists;
 
@@ -56,26 +55,25 @@ public class LongDoubleMessageStore
   private final
   MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
   /** Service worker */
-  private final CentralizedServiceWorker<LongWritable, ?, ?> service;
+  private final PartitionSplitInfo<LongWritable> partitionInfo;
 
   /**
    * Constructor
    *
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner Message messageCombiner
    */
   public LongDoubleMessageStore(
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner) {
-    this.service = service;
-    this.messageCombiner =
-        messageCombiner;
+    PartitionSplitInfo<LongWritable> partitionInfo,
+    MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
+  ) {
+    this.partitionInfo = partitionInfo;
+    this.messageCombiner = messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
       Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
     }
   }
@@ -92,7 +90,7 @@ public class LongDoubleMessageStore
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   @Override
@@ -118,12 +116,27 @@ public class LongDoubleMessageStore
               reusableMessage);
           message = reusableCurrentMessage.get();
         }
+        // FIXME: messageCombiner should create an initial message instead
         partitionMap.put(vertexId, message);
       }
     }
   }
 
   @Override
+  public void addMessage(
+    LongWritable vertexId,
+    DoubleWritable message
+  ) throws IOException {
+    Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      double originalValue = partitionMap.get(vertexId.get());
+      DoubleWritable originalMessage = new DoubleWritable(originalValue);
+      messageCombiner.combine(vertexId, originalMessage, message);
+      partitionMap.put(vertexId.get(), originalMessage.get());
+    }
+  }
+
+  @Override
   public void finalizeStore() {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
deleted file mode 100644
index d1c33be..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.VertexIdIterator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <L> list type
- */
-public abstract class LongAbstractListMessageStore<M extends Writable,
-  L extends List> extends LongAbstractMessageStore<M, L> {
-  /**
-   * Map used to store messages for nascent vertices i.e., ones
-   * that did not exist at the start of current superstep but will get
-   * created because of sending message to a non-existent vertex id
-   */
-  private final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongAbstractListMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable,
-          Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-    populateMap();
-
-    // create map for vertex ids (i.e., nascent vertices) not known yet
-    nascentMap = new Int2ObjectOpenHashMap<>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
-    }
-  }
-
-  /**
-   * Populate the map with all vertexIds for each partition
-   */
-  private void populateMap() { // TODO - can parallelize?
-    // populate with vertex ids already known
-    service.getPartitionStore().startIteration();
-    while (true) {
-      Partition partition = service.getPartitionStore().getNextPartition();
-      if (partition == null) {
-        break;
-      }
-      Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
-      for (Object obj : partition) {
-        Vertex vertex = (Vertex) obj;
-        LongWritable vertexId = (LongWritable) vertex.getId();
-        partitionMap.put(vertexId.get(), createList());
-      }
-      service.getPartitionStore().putPartition(partition);
-    }
-  }
-
-  /**
-   * Create an instance of L
-   * @return instance of L
-   */
-  protected abstract L createList();
-
-  /**
-   * Get list for the current vertexId
-   *
-   * @param iterator vertexId iterator
-   * @return list for current vertexId
-   */
-  protected L getList(
-    VertexIdIterator<LongWritable> iterator) {
-    PartitionOwner owner =
-        service.getVertexPartitionOwner(iterator.getCurrentVertexId());
-    long vertexId = iterator.getCurrentVertexId().get();
-    int partitionId = owner.getPartitionId();
-    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
-    if (!partitionMap.containsKey(vertexId)) {
-      synchronized (nascentMap) {
-        // assumption: not many nascent vertices are created
-        // so overall synchronization is negligible
-        Long2ObjectOpenHashMap<L> nascentPartitionMap =
-          nascentMap.get(partitionId);
-        if (nascentPartitionMap.get(vertexId) == null) {
-          nascentPartitionMap.put(vertexId, createList());
-        }
-        return nascentPartitionMap.get(vertexId);
-      }
-    }
-    return partitionMap.get(vertexId);
-  }
-
-  @Override
-  public void finalizeStore() {
-    for (int partitionId : nascentMap.keySet()) {
-      // nascent vertices are present only in nascent map
-      map.get(partitionId).putAll(nascentMap.get(partitionId));
-    }
-    nascentMap.clear();
-  }
-
-  // TODO - discussion
-  /*
-  some approaches for ensuring correctness with parallel inserts
-  - current approach: uses a small extra bit of memory by pre-populating
-  map & pushes everything map cannot handle to nascentMap
-  at the beginning of next superstep compute a single threaded finalizeStore is
-  called (so little extra memory + 1 sequential finish ops)
-  - used striped parallel fast utils instead (unsure of perf)
-  - use concurrent map (every get gets far slower)
-  - use reader writer locks (unsure of perf)
-  (code looks something like underneath)
-
-      private final ReadWriteLock rwl = new ReentrantReadWriteLock();
-      rwl.readLock().lock();
-      L list = partitionMap.get(vertexId);
-      if (list == null) {
-        rwl.readLock().unlock();
-        rwl.writeLock().lock();
-        if (partitionMap.get(vertexId) == null) {
-          list = createList();
-          partitionMap.put(vertexId, list);
-        }
-        rwl.readLock().lock();
-        rwl.writeLock().unlock();
-      }
-      rwl.readLock().unlock();
-  - adopted from the article
-    http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
-    ReentrantReadWriteLock.html
-   */
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
new file mode 100644
index 0000000..aee6a61
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <L> list type
+ */
+public abstract class LongAbstractListStore<M extends Writable,
+  L extends List> extends LongAbstractStore<M, L> {
+  /**
+   * Map used to store messages for nascent vertices i.e., ones
+   * that did not exist at the start of current superstep but will get
+   * created because of sending message to a non-existent vertex id
+   */
+  private final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongAbstractListStore(
+      MessageValueFactory<M> messageValueFactory,
+      PartitionSplitInfo<LongWritable> partitionInfo,
+      ImmutableClassesGiraphConfiguration<LongWritable,
+          Writable, Writable> config) {
+    super(messageValueFactory, partitionInfo, config);
+    populateMap();
+
+    // create map for vertex ids (i.e., nascent vertices) not known yet
+    nascentMap = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : partitionInfo.getPartitionIds()) {
+      nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
+    }
+  }
+
+  /**
+   * Populate the map with all vertexIds for each partition
+   */
+  private void populateMap() { // TODO - can parallelize?
+    // populate with vertex ids already known
+    partitionInfo.startIteration();
+    while (true) {
+      Partition partition = partitionInfo.getNextPartition();
+      if (partition == null) {
+        break;
+      }
+      Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
+      for (Object obj : partition) {
+        Vertex vertex = (Vertex) obj;
+        LongWritable vertexId = (LongWritable) vertex.getId();
+        partitionMap.put(vertexId.get(), createList());
+      }
+      partitionInfo.putPartition(partition);
+    }
+  }
+
+  /**
+   * Create an instance of L
+   * @return instance of L
+   */
+  protected abstract L createList();
+
+  /**
+   * Get list for the current vertexId
+   *
+   * @param vertexId vertex id
+   * @return list for current vertexId
+   */
+  protected L getList(LongWritable vertexId) {
+    long id = vertexId.get();
+    int partitionId = partitionInfo.getPartitionId(vertexId);
+    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+    L list = partitionMap.get(id);
+    if (list == null) {
+      Long2ObjectOpenHashMap<L> nascentPartitionMap =
+        nascentMap.get(partitionId);
+      // assumption: not many nascent vertices are created
+      // so overall synchronization is negligible
+      synchronized (nascentPartitionMap) {
+        list = nascentPartitionMap.get(id);
+        if (list == null) {
+          list = createList();
+          nascentPartitionMap.put(id, list);
+        }
+        return list;
+      }
+    }
+    return list;
+  }
+
+  @Override
+  public void finalizeStore() {
+    for (int partitionId : nascentMap.keySet()) {
+      // nascent vertices are present only in nascent map
+      map.get(partitionId).putAll(nascentMap.get(partitionId));
+    }
+    nascentMap.clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    int partitionId = partitionInfo.getPartitionId(vertexId);
+    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+    L list = partitionMap.get(vertexId.get());
+    if (list != null && !list.isEmpty()) {
+      return true;
+    }
+    Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
+    return nascentMessages != null &&
+           nascentMessages.containsKey(vertexId.get());
+  }
+
+  // TODO - discussion
+  /*
+  some approaches for ensuring correctness with parallel inserts
+  - current approach: uses a small extra bit of memory by pre-populating
+  map & pushes everything map cannot handle to nascentMap
+  at the beginning of next superstep compute a single threaded finalizeStore is
+  called (so little extra memory + 1 sequential finish ops)
+  - used striped parallel fast utils instead (unsure of perf)
+  - use concurrent map (every get gets far slower)
+  - use reader writer locks (unsure of perf)
+  (code looks something like underneath)
+
+      private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+      rwl.readLock().lock();
+      L list = partitionMap.get(vertexId);
+      if (list == null) {
+        rwl.readLock().unlock();
+        rwl.writeLock().lock();
+        if (partitionMap.get(vertexId) == null) {
+          list = createList();
+          partitionMap.put(vertexId, list);
+        }
+        rwl.readLock().lock();
+        rwl.writeLock().unlock();
+      }
+      rwl.readLock().unlock();
+  - adopted from the article
+    http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
+    ReentrantReadWriteLock.html
+   */
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
deleted file mode 100644
index b3ed4b2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import com.google.common.collect.Lists;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.LongIterator;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <T> datastructure used to hold messages
- */
-public abstract class LongAbstractMessageStore<M extends Writable, T>
-  implements MessageStore<LongWritable, M> {
-  /** Message value factory */
-  protected final MessageValueFactory<M> messageValueFactory;
-  /** Map from partition id to map from vertex id to message */
-  protected final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
-  /** Service worker */
-  protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
-  /** Giraph configuration */
-  protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
-  config;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service      Service worker
-   * @param config       Hadoop configuration
-   */
-  public LongAbstractMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
-          config) {
-    this.messageValueFactory = messageValueFactory;
-    this.service = service;
-    this.config = config;
-
-    map = new Int2ObjectOpenHashMap<>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
-      map.put(partitionId, partitionMap);
-    }
-  }
-
-  /**
-   * Get map which holds messages for partition which vertex belongs to.
-   *
-   * @param vertexId Id of the vertex
-   * @return Map which holds messages for partition which vertex belongs to.
-   */
-  protected Long2ObjectOpenHashMap<T> getPartitionMap(
-      LongWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
-  }
-
-  @Override
-  public void clearPartition(int partitionId) {
-    map.get(partitionId).clear();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(LongWritable vertexId) {
-    return getPartitionMap(vertexId).containsKey(vertexId.get());
-  }
-
-  @Override
-  public boolean hasMessagesForPartition(int partitionId) {
-    Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
-    return partitionMessages != null && !partitionMessages.isEmpty();
-  }
-
-  @Override
-  public void clearVertexMessages(LongWritable vertexId) {
-    getPartitionMap(vertexId).remove(vertexId.get());
-  }
-
-
-  @Override
-  public void clearAll() {
-    map.clear();
-  }
-
-  @Override
-  public Iterable<LongWritable> getPartitionDestinationVertices(
-      int partitionId) {
-    Long2ObjectOpenHashMap<T> partitionMap =
-        map.get(partitionId);
-    List<LongWritable> vertices =
-        Lists.newArrayListWithCapacity(partitionMap.size());
-    LongIterator iterator = partitionMap.keySet().iterator();
-    while (iterator.hasNext()) {
-      vertices.add(new LongWritable(iterator.nextLong()));
-    }
-    return vertices;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
new file mode 100644
index 0000000..385388d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <T> datastructure used to hold messages
+ */
+public abstract class LongAbstractStore<M extends Writable, T>
+  implements MessageStore<LongWritable, M> {
+  /** Message value factory */
+  protected final MessageValueFactory<M> messageValueFactory;
+  /** Map from partition id to map from vertex id to message */
+  protected final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
+  /** Service worker */
+  protected final PartitionSplitInfo<LongWritable> partitionInfo;
+  /** Giraph configuration */
+  protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+  config;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongAbstractStore(
+      MessageValueFactory<M> messageValueFactory,
+      PartitionSplitInfo<LongWritable> partitionInfo,
+      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
+          config) {
+    this.messageValueFactory = messageValueFactory;
+    this.partitionInfo = partitionInfo;
+    this.config = config;
+
+    map = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : partitionInfo.getPartitionIds()) {
+      Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  protected Long2ObjectOpenHashMap<T> getPartitionMap(
+      LongWritable vertexId) {
+    return map.get(partitionInfo.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void clearPartition(int partitionId) {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
+  }
+
+  @Override
+  public void clearVertexMessages(LongWritable vertexId) {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+
+  @Override
+  public void clearAll() {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<LongWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Long2ObjectOpenHashMap<T> partitionMap =
+        map.get(partitionId);
+    List<LongWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    LongIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new LongWritable(iterator.nextLong()));
+    }
+    return vertices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
deleted file mode 100644
index bcdab98..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class LongByteArrayMessageStore<M extends Writable>
-  extends LongAbstractMessageStore<M, DataInputOutput> {
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongByteArrayMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable,
-          Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return false;
-  }
-
-  /**
-   * Get the DataInputOutput for a vertex id, creating if necessary.
-   *
-   * @param partitionMap Partition map to look in
-   * @param vertexId Id of the vertex
-   * @return DataInputOutput for this vertex id (created if necessary)
-   */
-  private DataInputOutput getDataInputOutput(
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
-    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
-    if (dataInputOutput == null) {
-      dataInputOutput = config.createMessagesInputOutput();
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    return dataInputOutput;
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
-    synchronized (partitionMap) {
-      VertexIdMessageBytesIterator<LongWritable, M>
-          vertexIdMessageBytesIterator =
-          messages.getVertexIdMessageBytesIterator();
-      // Try to copy the message buffer over rather than
-      // doing a deserialization of a message just to know its size.  This
-      // should be more efficient for complex objects where serialization is
-      // expensive.  If this type of iterator is not available, fall back to
-      // deserializing/serializing the messages
-      if (vertexIdMessageBytesIterator != null) {
-        while (vertexIdMessageBytesIterator.hasNext()) {
-          vertexIdMessageBytesIterator.next();
-          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-              vertexIdMessageBytesIterator.getCurrentVertexId().get());
-          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              dataInputOutput.getDataOutput());
-        }
-      } else {
-        try {
-          VertexIdMessageIterator<LongWritable, M>
-              iterator = messages.getVertexIdMessageIterator();
-          while (iterator.hasNext()) {
-            iterator.next();
-            DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-                iterator.getCurrentVertexId().get());
-            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-                dataInputOutput.getDataOutput());
-          }
-        } catch (IOException e) {
-          throw new RuntimeException("addPartitionMessages: IOException while" +
-              " adding messages for a partition: " + e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void finalizeStore() {
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-    LongWritable vertexId) {
-    DataInputOutput dataInputOutput =
-        getPartitionMap(vertexId).get(vertexId.get());
-    if (dataInputOutput == null) {
-      return EmptyIterable.get();
-    } else {
-      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
-    }
-  }
-
-  @Override
-  public void writePartition(DataOutput out, int partitionId)
-    throws IOException {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    out.writeInt(partitionMap.size());
-    ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
-        partitionMap.long2ObjectEntrySet().fastIterator();
-    while (iterator.hasNext()) {
-      Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
-      out.writeLong(entry.getLongKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-    int partitionId) throws IOException {
-    int size = in.readInt();
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        new Long2ObjectOpenHashMap<DataInputOutput>(size);
-    while (size-- > 0) {
-      long vertexId = in.readLong();
-      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
-      dataInputOutput.readFields(in);
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    synchronized (map) {
-      map.put(partitionId, partitionMap);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
deleted file mode 100644
index eef75ba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.PointerListMessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
-
-/**
- * This stores messages in
- * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
- * and stores long pointers that point to serialized messages
- *
- * @param <M> message type
- */
-public class LongPointerListMessageStore<M extends Writable>
-  extends LongAbstractListMessageStore<M, LongArrayList>
-  implements MessageStore<LongWritable, M> {
-
-  /** Buffers of byte array outputs used to store messages - thread safe */
-  private final ExtendedByteArrayOutputBuffer bytesBuffer;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongPointerListMessageStore(
-    MessageValueFactory<M> messageValueFactory,
-    CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-    ImmutableClassesGiraphConfiguration<LongWritable,
-    Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return true;
-  }
-
-  @Override
-  protected LongArrayList createList() {
-    return new LongArrayList();
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) {
-    try {
-      VertexIdMessageIterator<LongWritable, M> iterator =
-          messages.getVertexIdMessageIterator();
-      long pointer = 0;
-      LongArrayList list;
-      while (iterator.hasNext()) {
-        iterator.next();
-        M msg = iterator.getCurrentMessage();
-        list = getList(iterator);
-        if (iterator.isNewMessage()) {
-          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
-          pointer = indexAndDataOut.getIndex();
-          pointer <<= 32;
-          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
-          pointer += dataOutput.getPos();
-          msg.write(dataOutput);
-        }
-        synchronized (list) { // TODO - any better way?
-          list.add(pointer);
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("addPartitionMessages: IOException while" +
-          " adding messages for a partition: " + e);
-    }
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-    LongWritable vertexId) {
-    LongArrayList list = getPartitionMap(vertexId).get(
-        vertexId.get());
-    if (list == null) {
-      return EmptyIterable.get();
-    } else {
-      return new PointerListMessagesIterable<>(messageValueFactory,
-        list, bytesBuffer);
-    }
-  }
-
-  // FIXME -- complete these for check-pointing
-  @Override
-  public void writePartition(DataOutput out, int partitionId)
-    throws IOException {
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in, int partitionId)
-    throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
new file mode 100644
index 0000000..525225c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.comm.messages.PointerListMessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This stores messages in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ * and stores long pointers that point to serialized messages
+ *
+ * @param <M> message type
+ */
+public class LongPointerListPerVertexStore<M extends Writable>
+  extends LongAbstractListStore<M, LongArrayList>
+  implements MessageStore<LongWritable, M> {
+
+  /** Buffers of byte array outputs used to store messages - thread safe */
+  private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongPointerListPerVertexStore(
+    MessageValueFactory<M> messageValueFactory,
+    PartitionSplitInfo<LongWritable> partitionInfo,
+    ImmutableClassesGiraphConfiguration<LongWritable,
+    Writable, Writable> config) {
+    super(messageValueFactory, partitionInfo, config);
+    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return true;
+  }
+
+  @Override
+  protected LongArrayList createList() {
+    return new LongArrayList();
+  }
+
+  @Override
+  public void addPartitionMessages(
+    int partitionId,
+    VertexIdMessages<LongWritable, M> messages
+  ) {
+    try {
+      VertexIdMessageIterator<LongWritable, M> iterator =
+          messages.getVertexIdMessageIterator();
+      long pointer = 0;
+      LongArrayList list;
+      while (iterator.hasNext()) {
+        iterator.next();
+        M msg = iterator.getCurrentMessage();
+        list = getList(iterator.getCurrentVertexId());
+
+        if (iterator.isNewMessage()) {
+          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+          pointer = indexAndDataOut.getIndex();
+          pointer <<= 32;
+          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+          pointer += dataOutput.getPos();
+          msg.write(dataOutput);
+        }
+        synchronized (list) { // TODO - any better way?
+          list.add(pointer);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("addPartitionMessages: IOException while" +
+          " adding messages for a partition: " + e);
+    }
+  }
+
+  @Override
+  public void addMessage(LongWritable vertexId, M message) throws IOException {
+    LongArrayList list = getList(vertexId);
+    IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+    long pointer = indexAndDataOut.getIndex();
+    pointer <<= 32;
+    ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+    pointer += dataOutput.getPos();
+    message.write(dataOutput);
+
+    synchronized (list) {
+      list.add(pointer);
+    }
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(LongWritable vertexId) {
+    LongArrayList list = getPartitionMap(vertexId).get(
+        vertexId.get());
+    if (list == null) {
+      return EmptyIterable.get();
+    } else {
+      return new PointerListMessagesIterable<>(messageValueFactory,
+        list, bytesBuffer);
+    }
+  }
+
+  // FIXME -- complete these for check-pointing
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+    throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 6273694..e8f00ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -19,12 +19,6 @@ package org.apache.giraph.comm.messages.queue;
 
 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -35,6 +29,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
 /**
  * This class decouples message receiving and processing
  * into separate threads thus reducing contention.
@@ -157,6 +158,12 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   }
 
   @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    // TODO: implement if LocalBlockRunner needs async message store
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void finalizeStore() {
     store.finalizeStore();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index c8d0f79..de67af4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.ooc.data;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
@@ -32,9 +36,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 
 /**
  * Implementation of a message store used for out-of-core mechanism.
@@ -137,6 +138,26 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     }
   }
 
+  @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    if (useMessageCombiner) {
+      messageStore.addMessage(vertexId, message);
+    } else {
+      // TODO: implement if LocalBlockRunner needs this message store
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * Gets the path that should be used specifically for message data.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @param superstep superstep for which message data should be stored
+   * @return path to files specific for message data
+   */
+  private static String getPath(String basePath, long superstep) {
+    return basePath + "_messages-S" + superstep;
+  }
 
   @Override
   public long loadPartitionData(int partitionId)

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
index 299dced..8f00293 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
@@ -23,23 +23,29 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.giraph.types.ops.IntTypeOps;
 import org.apache.giraph.types.ops.LongTypeOps;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Basic2ObjectMap with only basic set of operations.
- * All operations that return object T are returning reusable object,
+ * All operations that return object K are returning reusable object,
  * which is modified after calling any other function.
  *
  * @param <K> Key type
@@ -81,13 +87,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
    * @return the old value, or null if no value was present for the given key.
    */
   public abstract V remove(K key);
-
   /**
    * TypeOps for type of keys this object holds
    * @return TypeOps
    */
   public abstract PrimitiveIdTypeOps<K> getKeyTypeOps();
-
   /**
    * Fast iterator over keys within this map, which doesn't allocate new
    * element for each returned element.
@@ -100,6 +104,20 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
   public abstract Iterator<K> fastKeyIterator();
 
   /**
+   * Iterator over map values.
+   *
+   * @return Iterator
+   */
+  public abstract Iterator<V> valueIterator();
+
+  /**
+   * A collection of all values.
+   *
+   * @return Iterator
+   */
+  public abstract Collection<V> values();
+
+  /**
    * Iterator that reuses key object.
    *
    * @param <Iter> Primitive key iterator type
@@ -212,6 +230,16 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
     }
 
     @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
       out.writeInt(map.size());
       ObjectIterator<Int2ObjectMap.Entry<V>> iterator =
@@ -319,7 +347,22 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
     }
 
     @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
       out.writeInt(map.size());
       ObjectIterator<Long2ObjectMap.Entry<V>> iterator =
           map.long2ObjectEntrySet().fastIterator();
@@ -332,6 +375,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
 
     @Override
     public void readFields(DataInput in) throws IOException {
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
       int size = in.readInt();
       map.clear();
       map.trim(size);
@@ -342,4 +390,141 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
       }
     }
   }
+
+  /** Writable implementation of Basic2ObjectMap */
+  public static final class BasicObject2ObjectOpenHashMap<K extends Writable, V>
+      extends Basic2ObjectMap<K, V> {
+    /** Map */
+    private final Object2ObjectOpenHashMap<K, V> map;
+    /** Key writer */
+    private final WritableWriter<K> keyWriter;
+    /** Value writer */
+    private final WritableWriter<V> valueWriter;
+
+    /**
+     * Constructor
+     *
+     * @param keyWriter Writer of keys
+     * @param valueWriter Writer of values
+     */
+    public BasicObject2ObjectOpenHashMap(
+      WritableWriter<K> keyWriter,
+      WritableWriter<V> valueWriter
+    ) {
+      this.map = new Object2ObjectOpenHashMap<>();
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    /**
+     * Constructor
+     *
+     * @param capacity Map capacity
+     * @param keyWriter Writer of keys
+     * @param valueWriter Writer of values
+     */
+    public BasicObject2ObjectOpenHashMap(
+      int capacity,
+      WritableWriter<K> keyWriter,
+      WritableWriter<V> valueWriter
+    ) {
+      this.map = new Object2ObjectOpenHashMap<>(capacity);
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    @Override
+    public void clear() {
+      map.clear();
+    }
+
+    @Override
+    public int size() {
+      return map.size();
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+      return map.containsKey(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+      // we need a copy since the key object is mutable
+      K copyKey = WritableUtils.createCopy(key);
+      return map.put(copyKey, value);
+    }
+
+    @Override
+    public V get(K key) {
+      return map.get(key);
+    }
+
+    @Override
+    public V remove(K key) {
+      return map.remove(key);
+    }
+
+    @Override
+    public PrimitiveIdTypeOps<K> getKeyTypeOps() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<K> fastKeyIterator() {
+      return map.keySet().iterator();
+    }
+
+    @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Preconditions.checkState(
+        keyWriter != null,
+        "keyWriter is not provided"
+      );
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
+      out.writeInt(map.size());
+      ObjectIterator<Object2ObjectMap.Entry<K, V>> iterator =
+          map.object2ObjectEntrySet().fastIterator();
+      while (iterator.hasNext()) {
+        Object2ObjectMap.Entry<K, V> entry = iterator.next();
+        keyWriter.write(out, entry.getKey());
+        valueWriter.write(out, entry.getValue());
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      Preconditions.checkState(
+        keyWriter != null,
+        "keyWriter is not provided"
+      );
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
+      int size = in.readInt();
+      map.clear();
+      map.trim(size);
+      while (size-- > 0) {
+        K key = keyWriter.readFields(in);
+        V value = valueWriter.readFields(in);
+        map.put(key, value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
new file mode 100644
index 0000000..23df7d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops.collections;
+
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicObject2ObjectOpenHashMap;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Utility functions for constructing basic collections
+ */
+public class BasicCollectionsUtils {
+  /** No instances */
+  private BasicCollectionsUtils() { }
+
+  /**
+   * Construct OpenHashMap with primitive keys.
+   *
+   * @param <I> Vertex id type
+   * @param <V> Value type
+   * @param idClass Class type
+   * @return map
+   */
+  public static <I extends Writable, V>
+  Basic2ObjectMap<I, V> create2ObjectMap(Class<I> idClass) {
+    return create2ObjectMap(idClass, null, null);
+  }
+
+  /**
+   * Construct OpenHashMap with primitive keys.
+   *
+   * If keyWriter/valueWriter are not provided,
+   * readFields/write will throw an Exception, if called.
+   *
+   * @param <I> Vertex id type
+   * @param <V> Value type
+   * @param idClass Class type
+   * @param keyWriter writer for keys
+   * @param valueWriter writer for values
+   * @return map
+   */
+  public static <I extends Writable, V>
+  Basic2ObjectMap<I, V> create2ObjectMap(
+    Class<I> idClass,
+    WritableWriter<I> keyWriter,
+    WritableWriter<V> valueWriter
+  ) {
+    PrimitiveIdTypeOps<I> idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(
+      idClass
+    );
+    if (idTypeOps != null) {
+      return idTypeOps.create2ObjectOpenHashMap(valueWriter);
+    } else {
+      return new BasicObject2ObjectOpenHashMap<>(keyWriter, valueWriter);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 2865a53..46f7b48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.utils;
 
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -29,12 +29,13 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicCollectionsUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * TestGraph class for in-memory testing.
@@ -50,7 +51,7 @@ public class TestGraph<I extends WritableComparable,
   /** Vertex value combiner */
   protected final VertexValueCombiner<V> vertexValueCombiner;
   /** The vertex values */
-  protected HashMap<I, Vertex<I, V, E>> vertices = Maps.newHashMap();
+  protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
   /** The configuration */
   protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
@@ -60,12 +61,19 @@ public class TestGraph<I extends WritableComparable,
    * @param conf Should have vertex and edge classes set.
    */
   public TestGraph(GiraphConfiguration conf) {
-    this.conf = new ImmutableClassesGiraphConfiguration(conf);
+    this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
     vertexValueCombiner = this.conf.createVertexValueCombiner();
+    vertices = BasicCollectionsUtils.create2ObjectMap(
+      this.conf.getVertexIdClass()
+    );
   }
 
-  public HashMap<I, Vertex<I, V, E>> getVertices() {
-    return vertices;
+  public Collection<Vertex<I, V, E>> getVertices() {
+    return vertices.values();
+  }
+
+  public int getVertexCount() {
+    return vertices.size();
   }
 
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -108,7 +116,7 @@ public class TestGraph<I extends WritableComparable,
    * @return this
    */
   public TestGraph<I, V, E> addVertex(I id, V value,
-                                         Entry<I, E>... edges) {
+                                      Entry<I, E>... edges) {
     addVertex(makeVertex(id, value, edges));
     return this;
   }
@@ -174,14 +182,6 @@ public class TestGraph<I extends WritableComparable,
       .addEdge(EdgeFactory.create(toVertex, edgeValue));
     return this;
   }
-  /**
-   * An iterator over the ids
-   *
-   * @return the iterator
-   */
-  public Iterator<I> idIterator() {
-    return vertices.keySet().iterator();
-  }
 
   /**
    * An iterator over the vertices
@@ -190,7 +190,7 @@ public class TestGraph<I extends WritableComparable,
    */
   @Override
   public Iterator<Vertex<I, V, E>> iterator() {
-    return vertices.values().iterator();
+    return vertices.valueIterator();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
index aa25490..c3c43fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -18,12 +18,12 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /** Verbose Error mesage for ByteArray based messages */
 public class VerboseByteStructMessageWrite {
   /**
@@ -43,13 +43,37 @@ public class VerboseByteStructMessageWrite {
    * @throws IOException
    * @throws RuntimeException
    */
-  public static <I extends WritableComparable, M extends Writable> void
-  verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
-    DataOutput out) throws IOException {
+  public static <I extends WritableComparable, M extends Writable>
+  void verboseWriteCurrentMessage(
+    VertexIdMessageIterator<I, M> iterator,
+    DataOutput out
+  ) throws IOException {
+    verboseWriteCurrentMessage(
+      iterator.getCurrentVertexId(), iterator.getCurrentMessage(), out);
+  }
+
+  /**
+   * verboseWriteCurrentMessage
+   * de-serialize, then write messages
+   *
+   * @param vertexId vertexId
+   * @param message message
+   * @param out DataOutput
+   * @param <I> vertexId
+   * @param <M> message
+   * @throws IOException
+   * @throws RuntimeException
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  void verboseWriteCurrentMessage(
+    I vertexId,
+    M message,
+    DataOutput out
+  ) throws IOException {
     try {
-      iterator.getCurrentMessage().write(out);
+      message.write(out);
     } catch (NegativeArraySizeException e) {
-      handleNegativeArraySize(iterator.getCurrentVertexId());
+      handleNegativeArraySize(vertexId);
     }
   }
 
@@ -59,8 +83,8 @@ public class VerboseByteStructMessageWrite {
    * @param vertexId vertexId
    * @param <I> vertexId type
    */
-  public static <I extends WritableComparable> void handleNegativeArraySize(
-      I vertexId) {
+  public static <I extends WritableComparable>
+  void handleNegativeArraySize(I vertexId) {
     throw new RuntimeException("The numbers of bytes sent to vertex " +
         vertexId + " exceeded the max capacity of " +
         "its ExtendedDataOutput. Please consider setting " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index cdb9b7e..c51521d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -183,7 +183,7 @@ public class BspServiceWorker<I extends WritableComparable,
   private GiraphTimer waitRequestsTimer;
 
   /** InputSplit handlers used in INPUT_SUPERSTEP */
-  private WorkerInputSplitsHandler inputSplitsHandler;
+  private final WorkerInputSplitsHandler inputSplitsHandler;
 
   /** Memory observer */
   private final MemoryObserver memoryObserver;
@@ -1784,6 +1784,31 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
+  public Iterable<Integer> getPartitionIds() {
+    return getPartitionStore().getPartitionIds();
+  }
+
+  @Override
+  public long getPartitionVertexCount(Integer partitionId) {
+    return getPartitionStore().getPartitionVertexCount(partitionId);
+  }
+
+  @Override
+  public void startIteration() {
+    getPartitionStore().startIteration();
+  }
+
+  @Override
+  public Partition getNextPartition() {
+    return getPartitionStore().getNextPartition();
+  }
+
+  @Override
+  public void putPartition(Partition partition) {
+    getPartitionStore().putPartition(partition);
+  }
+
+  @Override
   public ServerData<I, V, E> getServerData() {
     return workerServer.getServerData();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index e3b2db0..2c5f2db 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.FloatSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -71,8 +71,10 @@ public class TestIntFloatPrimitiveMessageStores {
     );
     PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
     Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(service.getPartitionIds()).thenReturn(
+      Lists.newArrayList(0, 1));
     Mockito.when(partitionStore.getPartitionIds()).thenReturn(
-        Lists.newArrayList(0, 1));
+      Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
     Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -144,8 +146,8 @@ public class TestIntFloatPrimitiveMessageStores {
 
   @Test
   public void testIntByteArrayMessageStore() {
-    IntByteArrayMessageStore<FloatWritable> messageStore =
-        new IntByteArrayMessageStore<FloatWritable>(new
+    IdByteArrayMessageStore<IntWritable, FloatWritable> messageStore =
+        new IdByteArrayMessageStore<>(new
             TestMessageValueFactory<FloatWritable>(FloatWritable.class),
             service, conf);
     insertIntFloatMessages(messageStore);

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index dc9850b..5508b2c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.DoubleSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -68,8 +68,10 @@ public class TestLongDoublePrimitiveMessageStores {
     );
     PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
     Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(service.getPartitionIds()).thenReturn(
+      Lists.newArrayList(0, 1));
     Mockito.when(partitionStore.getPartitionIds()).thenReturn(
-        Lists.newArrayList(0, 1));
+      Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
     Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -145,8 +147,8 @@ public class TestLongDoublePrimitiveMessageStores {
 
   @Test
   public void testLongByteArrayMessageStore() {
-    LongByteArrayMessageStore<DoubleWritable> messageStore =
-        new LongByteArrayMessageStore<DoubleWritable>(
+    IdByteArrayMessageStore<LongWritable, DoubleWritable> messageStore =
+        new IdByteArrayMessageStore<>(
             new TestMessageValueFactory<DoubleWritable>(DoubleWritable.class),
             service, createLongDoubleConf());
     insertLongDoubleMessages(messageStore);

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
index ffc1288..1294c88 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.giraph.comm.messages.queue;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.factories.TestMessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -26,14 +34,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
 /**
  * Test case for AsyncMessageStoreWrapper
  */
@@ -71,6 +71,10 @@ public class AsyncMessageStoreWrapperTest {
     }
 
     @Override
+    public void addMessage(LongWritable vertexId, IntWritable message) throws IOException {
+    }
+
+    @Override
     public boolean isPointerListEncoding() {
       return false;
     }
@@ -124,5 +128,6 @@ public class AsyncMessageStoreWrapperTest {
     public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
 
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index d56c0fb..954e420 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -56,7 +56,7 @@ public class TestSwitchClasses {
     graph.addVertex(id2, new StatusValue());
     graph = InternalVertexRunner.runWithInMemoryOutput(conf, graph);
 
-    Assert.assertEquals(2, graph.getVertices().size());
+    Assert.assertEquals(2, graph.getVertexCount());
   }
 
   private static void checkVerticesOnFinalSuperstep(


[3/3] git commit: updated refs/heads/trunk to 2117d1d

Posted by ik...@apache.org.
faster maps

Summary:
The idea is to replace HashMap<LongWritable, V> to Long2ObjectOpenHashMap<V> (and Map<Int...> to Int2Object...)
This will save space and speed up some applications.

I changed the type of such a map in TestGraph.java, which gives up to 2x speed up on an
example of page rank computation (see comment below)

JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049

Test Plan: TestBasicCollections.java contain some tests

Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, heslami, ikabiljo

Reviewed By: heslami

Differential Revision: https://reviews.facebook.net/D55587


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2117d1db
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2117d1db
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2117d1db

Branch: refs/heads/trunk
Commit: 2117d1dbba8f18b08da70e890c30111edb3aebe3
Parents: 2ae95bd
Author: spupyrev <sp...@fb.com>
Authored: Tue Aug 30 17:32:13 2016 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Wed Aug 31 10:23:06 2016 -0700

----------------------------------------------------------------------
 .../framework/api/local/InternalApi.java        | 119 +++++-
 .../api/local/InternalMessageStore.java         | 360 ++++---------------
 .../framework/api/local/LocalBlockRunner.java   |   8 +-
 .../block_app/test_setup/NumericTestGraph.java  |   2 +-
 .../giraph/bsp/CentralizedServiceWorker.java    |  14 +-
 .../flow_control/CreditBasedFlowControl.java    |  37 +-
 .../messages/AbstractListPerVertexStore.java    |  48 ++-
 .../ByteArrayMessagesPerVertexStore.java        |  50 ++-
 .../messages/InMemoryMessageStoreFactory.java   |  55 ++-
 .../giraph/comm/messages/MessageStore.java      |  12 +
 .../comm/messages/MessageStoreFactory.java      |   5 +-
 .../comm/messages/OneMessagePerVertexStore.java |  51 ++-
 .../comm/messages/PartitionSplitInfo.java       |  70 ++++
 .../messages/PointerListPerVertexStore.java     |  41 ++-
 .../comm/messages/SimpleMessageStore.java       |  15 +-
 .../primitives/IdByteArrayMessageStore.java     |  52 ++-
 .../primitives/IdOneMessagePerVertexStore.java  |  48 ++-
 .../primitives/IntByteArrayMessageStore.java    | 257 -------------
 .../primitives/IntFloatMessageStore.java        |  42 ++-
 .../primitives/LongDoubleMessageStore.java      |  39 +-
 .../long_id/LongAbstractListMessageStore.java   | 170 ---------
 .../long_id/LongAbstractListStore.java          | 182 ++++++++++
 .../long_id/LongAbstractMessageStore.java       | 132 -------
 .../primitives/long_id/LongAbstractStore.java   | 133 +++++++
 .../long_id/LongByteArrayMessageStore.java      | 177 ---------
 .../long_id/LongPointerListMessageStore.java    | 134 -------
 .../long_id/LongPointerListPerVertexStore.java  | 151 ++++++++
 .../queue/AsyncMessageStoreWrapper.java         |  19 +-
 .../giraph/ooc/data/DiskBackedMessageStore.java |  27 +-
 .../types/ops/collections/Basic2ObjectMap.java  | 191 +++++++++-
 .../ops/collections/BasicCollectionsUtils.java  |  73 ++++
 .../java/org/apache/giraph/utils/TestGraph.java |  32 +-
 .../utils/VerboseByteStructMessageWrite.java    |  44 ++-
 .../apache/giraph/worker/BspServiceWorker.java  |  27 +-
 .../TestIntFloatPrimitiveMessageStores.java     |  10 +-
 .../TestLongDoublePrimitiveMessageStores.java   |  10 +-
 .../queue/AsyncMessageStoreWrapperTest.java     |  21 +-
 .../apache/giraph/master/TestSwitchClasses.java |   2 +-
 .../giraph/types/TestBasicCollections.java      | 207 +++++++++++
 .../java/org/apache/giraph/utils/MockUtils.java |  15 +-
 .../scc/SccComputationTestInMemory.java         |   6 +-
 41 files changed, 1677 insertions(+), 1411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
index a4703b4..a8d5ef7 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -17,12 +17,20 @@
  */
 package org.apache.giraph.block_app.framework.api.local;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Preconditions;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -35,7 +43,8 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
 import org.apache.giraph.block_app.framework.api.Counter;
-import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore;
+import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
+import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
@@ -44,8 +53,10 @@ import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
 import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
@@ -62,8 +73,6 @@ import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Internal implementation of Block API interfaces - representing an in-memory
  * giraph instance.
@@ -371,17 +380,28 @@ class InternalApi<I extends WritableComparable, V extends Writable,
     previousMessages = nextMessages;
     previousWorkerMessages = nextWorkerMessages;
 
-    nextMessages = InternalConcurrentMessageStore.createMessageStore(
-        conf, computation, runAllChecks);
+    nextMessages = createMessageStore(
+      conf,
+      computation.getOutgoingMessageClasses(conf),
+      createPartitionInfo(),
+      runAllChecks
+    );
     nextWorkerMessages = new ArrayList<>();
 
+    // finalize previous messages
+    if (previousMessages != null) {
+      previousMessages.finalizeStore();
+    }
+
     // process mutations:
-    Set<I> targets = previousMessages == null ?
-      Collections.EMPTY_SET : previousMessages.targetsSet();
-    if (createVertexOnMsgs) {
-      for (I target : targets) {
+    if (createVertexOnMsgs && previousMessages != null) {
+      Iterator<I> iter = previousMessages.targetVertexIds();
+      while (iter.hasNext()) {
+        I target = iter.next();
         if (getPartition(target).getVertex(target) == null) {
-          mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
+          // need a copy as the key might be reusable
+          I copyId = WritableUtils.createCopy(target);
+          mutations.putIfAbsent(copyId, new VertexMutations<I, V, E>());
         }
       }
     }
@@ -393,8 +413,11 @@ class InternalApi<I extends WritableComparable, V extends Writable,
           getPartition(vertexIndex).getVertex(vertexIndex);
       VertexMutations<I, V, E> curMutations = entry.getValue();
       Vertex<I, V, E> vertex = vertexResolver.resolve(
-          vertexIndex, originalVertex, curMutations,
-          targets.contains(vertexIndex));
+        vertexIndex,
+        originalVertex,
+        curMutations,
+        previousMessages != null && previousMessages.hasMessage(vertexIndex)
+      );
 
       if (vertex != null) {
         getPartition(vertex.getId()).putVertex(vertex);
@@ -406,6 +429,76 @@ class InternalApi<I extends WritableComparable, V extends Writable,
     mutations.clear();
   }
 
+  private <M extends Writable>
+  InternalMessageStore<I, M> createMessageStore(
+    ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+    MessageClasses<I, M> messageClasses,
+    PartitionSplitInfo<I> partitionInfo,
+    boolean runAllChecks
+  ) {
+    InternalMessageStore<I, M> messageStore =
+      InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo);
+    if (runAllChecks) {
+      return new InternalChecksMessageStore<I, M>(
+          messageStore, conf, messageClasses.createMessageValueFactory(conf));
+    } else {
+      return messageStore;
+    }
+  }
+
+  private PartitionSplitInfo<I> createPartitionInfo() {
+    return new PartitionSplitInfo<I>() {
+      /** Ids of partitions */
+      private IntList partitionIds;
+      /** Queue of partitions to be precessed in a superstep */
+      private Queue<Partition<I, V, E>> partitionQueue;
+
+      @Override
+      public int getPartitionId(I vertexId) {
+        return partitionerFactory.getPartition(vertexId, partitions.size(), 1);
+      }
+
+      @Override
+      public Iterable<Integer> getPartitionIds() {
+        if (partitionIds == null) {
+          partitionIds = new IntArrayList(partitions.size());
+          for (int i = 0; i < partitions.size(); i++) {
+            partitionIds.add(i);
+          }
+        }
+        Preconditions.checkState(partitionIds.size() == partitions.size());
+        return partitionIds;
+      }
+
+      @Override
+      public long getPartitionVertexCount(Integer partitionId) {
+        return partitions.get(partitionId).getVertexCount();
+      }
+
+      @Override
+      public void startIteration() {
+        checkState(partitionQueue == null || partitionQueue.isEmpty(),
+          "startIteration: It seems that some of " +
+          "of the partitions from previous iteration over partition store are" +
+          " not yet processed.");
+
+        partitionQueue = new LinkedList<Partition<I, V, E>>();
+        for (Partition<I, V, E> partition : partitions) {
+          partitionQueue.add(partition);
+        }
+      }
+
+      @Override
+      public Partition getNextPartition() {
+        return partitionQueue.poll();
+      }
+
+      @Override
+      public void putPartition(Partition partition) {
+      }
+    };
+  }
+
   public List<Partition<I, V, E>> getPartitions() {
     return partitions;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
index 92d9821..d8ea68a 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
@@ -17,32 +17,24 @@
  */
 package org.apache.giraph.block_app.framework.api.local;
 
+import com.google.common.collect.Iterators;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
-import org.apache.giraph.combiner.MessageCombiner;
-import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.types.ops.TypeOps;
-import org.apache.giraph.types.ops.TypeOpsUtils;
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.AbstractIterator;
-
 /**
  * Interface for internal message store, used by LocalBlockRunner
  *
@@ -52,309 +44,93 @@ import com.google.common.collect.AbstractIterator;
 @SuppressWarnings("rawtypes")
 interface InternalMessageStore
     <I extends WritableComparable, M extends Writable> {
-  Set<I> targetsSet();
+  Iterator<I> targetVertexIds();
+  boolean hasMessage(I id);
   Iterable<M> takeMessages(I id);
   void sendMessage(I id, M message);
   void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
+  void finalizeStore();
 
   /**
-   * Abstract Internal message store implementation that uses
-   * ConcurrentHashMap to store objects received thus far.
+   * A wrapper that uses InMemoryMessageStoreFactory to
+   * create MessageStore
    *
    * @param <I> Vertex id type
    * @param <M> Message type
-   * @param <R> Receiver object that particular implementation uses
-   *            (message, array of messages, byte array, etc)
    */
-  abstract class InternalConcurrentMessageStore
-      <I extends WritableComparable, M extends Writable, R>
-      implements InternalMessageStore<I, M> {
-    private final ConcurrentHashMap<I, R> received =
-        new ConcurrentHashMap<>();
-
-    private final Class<I> idClass;
-    private final TypeOps<I> idTypeOps;
-
-    InternalConcurrentMessageStore(Class<I> idClass) {
-      this.idClass = idClass;
-      idTypeOps = TypeOpsUtils.getTypeOpsOrNull(idClass);
-    }
-
-    public I copyId(I id) {
-      if (idTypeOps != null) {
-        return idTypeOps.createCopy(id);
-      } else {
-        return WritableUtils.createCopy(id, idClass, null);
-      }
-    }
-
-    R getReceiverFor(I id) {
-      R value = received.get(id);
-
-      if (value == null) {
-        id = copyId(id);
-        value = createNewReceiver();
-        R oldValue = received.putIfAbsent(id, value);
-        if (oldValue != null) {
-          value = oldValue;
-        }
-      }
-      return value;
-    }
-
-    R removeFor(I id) {
-      return received.remove(id);
-    }
-
-    abstract R createNewReceiver();
-
-    @Override
-    public Set<I> targetsSet() {
-      return received.keySet();
-    }
-
-    @Override
-    public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
-      while (idIter.hasNext()) {
-        sendMessage(idIter.next(), message);
-      }
-    }
-
-    public static <I extends WritableComparable, M extends Writable>
-    InternalMessageStore<I, M> createMessageStore(
-      final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
-      final MessageClasses<I, M> messageClasses
+  class InternalWrappedMessageStore
+  <I extends WritableComparable, M extends Writable>
+  implements InternalMessageStore<I, M> {
+    private final MessageStore<I, M> messageStore;
+    private final PartitionSplitInfo<I> partitionInfo;
+
+    public InternalWrappedMessageStore(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+      MessageStore<I, M> messageStore,
+      PartitionSplitInfo<I> partitionInfo
     ) {
-      MessageCombiner<? super I, M> combiner =
-          messageClasses.createMessageCombiner(conf);
-      if (combiner != null) {
-        return new InternalCombinerMessageStore<>(
-            conf.getVertexIdClass(), combiner);
-      } else if (messageClasses.getMessageEncodeAndStoreType().equals(
-          MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
-        return new InternalSharedByteMessageStore<>(
-            conf.getVertexIdClass(),
-            messageClasses.createMessageValueFactory(conf));
-      } else {
-        return new InternalByteMessageStore<>(
-          conf.getVertexIdClass(),
-          messageClasses.createMessageValueFactory(conf),
-          conf);
-      }
+      this.messageStore = messageStore;
+      this.partitionInfo = partitionInfo;
     }
 
     public static <I extends WritableComparable, M extends Writable>
-    InternalMessageStore<I, M> createMessageStore(
-        final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
-        final BlockWorkerPieces pieces, boolean runAllChecks) {
-      @SuppressWarnings("unchecked")
-      MessageClasses<I, M> messageClasses =
-          pieces.getOutgoingMessageClasses(conf);
-
-      InternalMessageStore<I, M> messageStore =
-          createMessageStore(conf, messageClasses);
-      if (runAllChecks) {
-        return new InternalChecksMessageStore<I, M>(
-            messageStore, conf, messageClasses.createMessageValueFactory(conf));
-      } else {
-        return messageStore;
-      }
-    }
-  }
-
-  /**
-   * InternalMessageStore that combines messages as they are received.
-   *
-   * @param <I> Vertex id value type
-   * @param <M> Message type
-   */
-  static class InternalCombinerMessageStore
-      <I extends WritableComparable, M extends Writable>
-      extends InternalConcurrentMessageStore<I, M, M> {
-    private final MessageCombiner<? super I, M> messageCombiner;
-
-    public InternalCombinerMessageStore(Class<I> idClass,
-        MessageCombiner<? super I, M> messageCombiner) {
-      super(idClass);
-      this.messageCombiner = messageCombiner;
-    }
-
-    @Override
-    public Iterable<M> takeMessages(I id) {
-      M message = removeFor(id);
-      if (message != null) {
-        return Collections.singleton(message);
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public void sendMessage(I id, M message) {
-      M mainMessage = getReceiverFor(id);
-      synchronized (mainMessage) {
-        messageCombiner.combine(id, mainMessage, message);
-      }
-    }
-
-    @Override
-    M createNewReceiver() {
-      return messageCombiner.createInitialMessage();
-    }
-  }
-
-  /**
-   * InternalMessageStore that keeps messages for each vertex in byte array.
-   *
-   * @param <I> Vertex id value type
-   * @param <M> Message type
-   */
-  static class InternalByteMessageStore
-      <I extends WritableComparable, M extends Writable>
-      extends InternalConcurrentMessageStore<I, M,
-          ExtendedDataOutput> {
-    private final MessageValueFactory<M> messageFactory;
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
-
-    public InternalByteMessageStore(
-      Class<I> idClass, MessageValueFactory<M> messageFactory,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> conf
+    InternalMessageStore<I, M> create(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+      MessageClasses<I, M> messageClasses,
+      PartitionSplitInfo<I> partitionInfo
     ) {
-      super(idClass);
-      this.messageFactory = messageFactory;
-      this.conf = conf;
-    }
-
-    @Override
-    public Iterable<M> takeMessages(I id) {
-      final ExtendedDataOutput out = removeFor(id);
-      if (out == null) {
-        return null;
-      }
-
-      return new Iterable<M>() {
-        @Override
-        public Iterator<M> iterator() {
-          final ExtendedDataInput in = conf.createExtendedDataInput(
-            out.getByteArray(), 0, out.getPos()
-          );
-
-          final M message = messageFactory.newInstance();
-          return new AbstractIterator<M>() {
-            @Override
-            protected M computeNext() {
-              if (in.available() == 0) {
-                return endOfData();
-              }
-              try {
-                message.readFields(in);
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-              return message;
-            }
-          };
-        }
-      };
+      InMemoryMessageStoreFactory<I, M> factory =
+        new InMemoryMessageStoreFactory<>();
+      factory.initialize(partitionInfo, conf);
+      return new InternalWrappedMessageStore<>(
+        conf,
+        factory.newStore(messageClasses),
+        partitionInfo
+      );
     }
 
     @Override
     public void sendMessage(I id, M message) {
-      ExtendedDataOutput out = getReceiverFor(id);
-
-      synchronized (out) {
-        try {
-          message.write(out);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+      try {
+        messageStore.addMessage(id, message);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
 
     @Override
-    ExtendedDataOutput createNewReceiver() {
-      return conf.createExtendedDataOutput();
-    }
-  }
-
-  /**
-   * InternalMessageStore that creates byte[] for each message, and
-   * all receivers share the same byte[].
-   *
-   * @param <I> Vertex id value type
-   * @param <M> Message type
-   */
-  static class InternalSharedByteMessageStore
-      <I extends WritableComparable, M extends Writable>
-      extends InternalConcurrentMessageStore<I, M, List<byte[]>> {
-    private final MessageValueFactory<M> messageFactory;
-
-    public InternalSharedByteMessageStore(
-        Class<I> idClass, MessageValueFactory<M> messageFactory) {
-      super(idClass);
-      this.messageFactory = messageFactory;
+    public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
+      while (idIter.hasNext()) {
+        sendMessage(idIter.next(), message);
+      }
     }
 
     @Override
     public Iterable<M> takeMessages(I id) {
-      final List<byte[]> out = removeFor(id);
-      if (out == null) {
-        return null;
-      }
-
-      return new Iterable<M>() {
-        @Override
-        public Iterator<M> iterator() {
-          final Iterator<byte[]> byteIter = out.iterator();
-          final M message = messageFactory.newInstance();
-          final UnsafeReusableByteArrayInput reusableInput =
-              new UnsafeReusableByteArrayInput();
-
-          return new Iterator<M>() {
-            @Override
-            public boolean hasNext() {
-              return byteIter.hasNext();
-            }
-
-            @Override
-            public M next() {
-              WritableUtils.fromByteArrayUnsafe(
-                  byteIter.next(), message, reusableInput);
-              return message;
-            }
-
-            @Override
-            public void remove() {
-              byteIter.remove();
-            }
-          };
-        }
-      };
-    }
-
-    private void storeMessage(I id, byte[] messageData) {
-      List<byte[]> out = getReceiverFor(id);
-      synchronized (out) {
-        out.add(messageData);
-      }
+      Iterable<M> result = messageStore.getVertexMessages(id);
+      messageStore.clearVertexMessages(id);
+      return result;
     }
 
     @Override
-    List<byte[]> createNewReceiver() {
-      return new ArrayList<>();
+    public Iterator<I> targetVertexIds() {
+      List<Iterator<I>> iterators = new ArrayList<>();
+      for (int partition : partitionInfo.getPartitionIds()) {
+        Iterable<I> vertices =
+          messageStore.getPartitionDestinationVertices(partition);
+        iterators.add(vertices.iterator());
+      }
+      return Iterators.concat(iterators.iterator());
     }
 
     @Override
-    public void sendMessage(I id, M message) {
-      storeMessage(id, WritableUtils.toByteArrayUnsafe(message));
+    public boolean hasMessage(I id) {
+      return messageStore.hasMessagesForVertex(id);
     }
 
     @Override
-    public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
-      byte[] messageData = WritableUtils.toByteArrayUnsafe(message);
-      while (idIter.hasNext()) {
-        storeMessage(idIter.next(), messageData);
-      }
+    public void finalizeStore() {
+      messageStore.finalizeStore();
     }
   }
 
@@ -369,9 +145,11 @@ interface InternalMessageStore
     private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
     private final MessageValueFactory<M> messageFactory;
 
-    public InternalChecksMessageStore(InternalMessageStore<I, M> messageStore,
-        ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
-        MessageValueFactory<M> messageFactory) {
+    public InternalChecksMessageStore(
+      InternalMessageStore<I, M> messageStore,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+      MessageValueFactory<M> messageFactory
+    ) {
       this.messageStore = messageStore;
       this.conf = conf;
       this.messageFactory = messageFactory;
@@ -428,8 +206,18 @@ interface InternalMessageStore
     }
 
     @Override
-    public Set<I> targetsSet() {
-      return messageStore.targetsSet();
+    public boolean hasMessage(I id) {
+      return messageStore.hasMessage(id);
+    }
+
+    @Override
+    public Iterator<I> targetVertexIds() {
+      return messageStore.targetVertexIds();
+    }
+
+    @Override
+    public void finalizeStore() {
+      messageStore.finalizeStore();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index 90aa8a2..33cd84b 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -63,6 +63,9 @@ public class LocalBlockRunner {
   /** Number of threads to use */
   public static final IntConfOption NUM_THREADS = new IntConfOption(
       "test.LocalBlockRunner.NUM_THREADS", 3, "");
+  /** Number of vertex partitions */
+  public static final IntConfOption NUM_PARTITIONS = new IntConfOption(
+      "test.LocalBlockRunner.NUM_PARTITIONS", 16, "");
   /**
    * Whether to run all supported checks. Disable if you are running this
    * not within a unit test, and on a large graph, where performance matters.
@@ -148,7 +151,8 @@ public class LocalBlockRunner {
     Preconditions.checkNotNull(block);
     Preconditions.checkNotNull(graph);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
-    int numPartitions = NUM_THREADS.get(conf);
+    int numThreads = NUM_THREADS.get(conf);
+    int numPartitions = NUM_PARTITIONS.get(conf);
     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
@@ -171,7 +175,7 @@ public class LocalBlockRunner {
           }
         }));
 
-    ExecutorService executor = Executors.newFixedThreadPool(numPartitions);
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
 
     if (runAllChecks) {
       for (Vertex<I, V, E> vertex : graph) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
index c5d2fb1..5b1508e 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
@@ -101,7 +101,7 @@ public class NumericTestGraph<I extends WritableComparable,
    * Get number of vertices in the graph
    */
   public int getVertexCount() {
-    return testGraph.getVertices().size();
+    return testGraph.getVertexCount();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 5249829..e5e6b63 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -18,8 +18,13 @@
 
 package org.apache.giraph.bsp;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
@@ -30,18 +35,14 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.worker.WorkerInputSplitsHandler;
 import org.apache.giraph.worker.WorkerAggregatorHandler;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.worker.WorkerInputSplitsHandler;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
 /**
  * All workers should have access to this centralized service to
  * execute the following methods.
@@ -53,7 +54,7 @@ import java.util.List;
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceWorker<I extends WritableComparable,
   V extends Writable, E extends Writable>
-  extends CentralizedService<I, V, E> {
+  extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
   /**
    * Setup (must be called prior to any other function)
    *
@@ -146,6 +147,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * @param vertexId Vertex id
    * @return Partition id
    */
+  @Override
   int getPartitionId(I vertexId);
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index 0e1d3d6..9b15b9b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -18,22 +18,8 @@
 
 package org.apache.giraph.comm.flow_control;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.comm.netty.handler.AckSignalFlag;
-import org.apache.giraph.comm.requests.SendResumeRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.utils.AdjustableSemaphore;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.log4j.Logger;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -52,8 +38,23 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.SendResumeRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Representation of credit-based flow control policy. With this policy there

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
index c28dff5..d1cfc3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
@@ -18,18 +18,17 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Abstract Implementation of {@link SimpleMessageStore} where
  * multiple messages are stored per vertex as a list
@@ -46,14 +45,14 @@ public abstract class AbstractListPerVertexStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Message class held in the store
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param config Hadoop configuration
    */
   public AbstractListPerVertexStore(
     MessageValueFactory<M> messageValueFactory,
-    CentralizedServiceWorker<I, ?, ?> service,
+    PartitionSplitInfo<I> partitionInfo,
     ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    super(messageValueFactory, service, config);
+    super(messageValueFactory, partitionInfo, config);
   }
 
   /**
@@ -71,15 +70,38 @@ public abstract class AbstractListPerVertexStore<I extends WritableComparable,
    * @return pointer list
    */
   protected L getOrCreateList(VertexIdIterator<I> iterator) {
-    PartitionOwner owner =
-        service.getVertexPartitionOwner(iterator.getCurrentVertexId());
-    int partitionId = owner.getPartitionId();
+    int partitionId = getPartitionId(iterator.getCurrentVertexId());
     ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
     L list = partitionMap.get(iterator.getCurrentVertexId());
     if (list == null) {
       L newList = createList();
       list = partitionMap.putIfAbsent(
-          iterator.releaseCurrentVertexId(), newList);
+        iterator.releaseCurrentVertexId(), newList);
+      if (list == null) {
+        list = newList;
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Get the list of pointers for a vertex
+   * Each pointer has information of how to access an encoded message
+   * for this vertex
+   * This method will take ownership of the vertex id from the
+   * iterator if necessary (when used in the partition map entry)
+   *
+   * @param vertexId vertex id
+   * @return pointer list
+   */
+  protected L getOrCreateList(I vertexId) {
+    int partitionId = getPartitionId(vertexId);
+    ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
+    L list = partitionMap.get(vertexId);
+    if (list == null) {
+      L newList = createList();
+      I copyId = WritableUtils.createCopy(vertexId);
+      list = partitionMap.putIfAbsent(copyId, newList);
       if (list == null) {
         list = newList;
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index efbe11b..7bf52e4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.messages;
 
+import com.google.common.collect.Iterators;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -33,12 +35,11 @@ import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Iterators;
-
 /**
  * Implementation of {@link SimpleMessageStore} where multiple messages are
  * stored per vertex as byte backed datastructures.
@@ -53,14 +54,14 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Message class held in the store
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param config Hadoop configuration
    */
   public ByteArrayMessagesPerVertexStore(
       MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
+      PartitionSplitInfo<I> partitionInfo,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    super(messageValueFactory, service, config);
+    super(messageValueFactory, partitionInfo, config);
   }
 
   @Override
@@ -138,6 +139,26 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   }
 
   @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    ConcurrentMap<I, DataInputOutput> partitionMap =
+      getOrCreatePartitionMap(getPartitionId(vertexId));
+    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+    if (dataInputOutput == null) {
+      DataInputOutput newDataOutput = config.createMessagesInputOutput();
+      I copyId = WritableUtils.createCopy(vertexId);
+      dataInputOutput = partitionMap.putIfAbsent(copyId, newDataOutput);
+      if (dataInputOutput == null) {
+        dataInputOutput = newDataOutput;
+      }
+    }
+
+    synchronized (dataInputOutput) {
+      VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
+        vertexId, message, dataInputOutput.getDataOutput());
+    }
+  }
+
+  @Override
   protected Iterable<M> getMessagesAsIterable(
       DataInputOutput dataInputOutput) {
     return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
@@ -199,7 +220,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   public static class Factory<I extends WritableComparable, M extends Writable>
     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
-    private CentralizedServiceWorker<I, ?, ?> service;
+    private PartitionSplitInfo<I> partitionInfo;
     /** Hadoop configuration */
     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
@@ -207,12 +228,14 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     public Factory() { }
 
     /**
-     * @param service Worker service
+     * @param partitionInfo Partition split info
      * @param config  Hadoop configuration
      */
-    public Factory(CentralizedServiceWorker<I, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-      this.service = service;
+    public Factory(
+      PartitionSplitInfo<I> partitionInfo,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config
+    ) {
+      this.partitionInfo = partitionInfo;
       this.config = config;
     }
 
@@ -221,14 +244,15 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
         MessageClasses<I, M> messageClasses) {
       return new ByteArrayMessagesPerVertexStore<I, M>(
           messageClasses.createMessageValueFactory(config),
-          service, config);
+          partitionInfo, config);
     }
 
     @Override
-    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+    public void initialize(PartitionSplitInfo<I> partitionInfo,
         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-      this.service = service;
+      this.partitionInfo = partitionInfo;
       this.config = conf;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 99a12c5..aa6a703 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -18,15 +18,12 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
-import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
-import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
-import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore;
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -59,8 +56,8 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   private static final Logger LOG =
       Logger.getLogger(InMemoryMessageStoreFactory.class);
 
-  /** Service worker */
-  protected CentralizedServiceWorker<I, ?, ?> service;
+  /** Partition info */
+  protected PartitionSplitInfo<I> partitionInfo;
   /** Hadoop configuration */
   protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 
@@ -87,24 +84,22 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
     if (vertexIdClass.equals(IntWritable.class) &&
         messageClass.equals(FloatWritable.class)) {
       messageStore = new IntFloatMessageStore(
-          (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
+          (PartitionSplitInfo<IntWritable>) partitionInfo,
           (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
     } else if (vertexIdClass.equals(LongWritable.class) &&
         messageClass.equals(DoubleWritable.class)) {
       messageStore = new LongDoubleMessageStore(
-          (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
+          (PartitionSplitInfo<LongWritable>) partitionInfo,
           (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
     } else {
       PrimitiveIdTypeOps<I> idTypeOps =
           TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
       if (idTypeOps != null) {
         messageStore = new IdOneMessagePerVertexStore<>(
-            messageValueFactory, service, messageCombiner,
-            conf);
+          messageValueFactory, partitionInfo, messageCombiner, conf);
       } else {
-        messageStore =
-            new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-                messageCombiner, conf);
+        messageStore = new OneMessagePerVertexStore<I, M>(
+          messageValueFactory, partitionInfo, messageCombiner, conf);
       }
     }
     return messageStore;
@@ -124,21 +119,11 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       MessageEncodeAndStoreType encodeAndStore) {
     MessageStore messageStore = null;
     Class<I> vertexIdClass = conf.getVertexIdClass();
-    if (vertexIdClass.equals(IntWritable.class)) { // INT
-      messageStore = new IntByteArrayMessageStore(messageValueFactory,
-          service, conf);
-    } else if (vertexIdClass.equals(LongWritable.class)) { // LONG
-      if (encodeAndStore.equals(
-          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
-          encodeAndStore.equals(
-            MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
-        messageStore = new LongByteArrayMessageStore(messageValueFactory,
-            service, conf);
-      } else if (encodeAndStore.equals(
-          MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
-        messageStore = new LongPointerListMessageStore(messageValueFactory,
-            service, conf);
-      }
+    // a special case for LongWritable with POINTER_LIST_PER_VERTEX
+    if (vertexIdClass.equals(LongWritable.class) && encodeAndStore.equals(
+        MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+      messageStore = new LongPointerListPerVertexStore(
+        messageValueFactory, partitionInfo, conf);
     } else { // GENERAL
       if (encodeAndStore.equals(
           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
@@ -148,15 +133,15 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
             TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
         if (idTypeOps != null) {
           messageStore = new IdByteArrayMessageStore<>(
-              messageValueFactory, service, conf);
+              messageValueFactory, partitionInfo, conf);
         } else {
           messageStore = new ByteArrayMessagesPerVertexStore<>(
-              messageValueFactory, service, conf);
+              messageValueFactory, partitionInfo, conf);
         }
       } else if (encodeAndStore.equals(
           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
-        messageStore = new PointerListPerVertexStore<>(messageValueFactory,
-            service, conf);
+        messageStore = new PointerListPerVertexStore<>(
+          messageValueFactory, partitionInfo, conf);
       }
     }
     return messageStore;
@@ -193,7 +178,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
     if (asyncMessageStoreThreads > 0) {
       messageStore = new AsyncMessageStoreWrapper(
           messageStore,
-          service.getPartitionStore().getPartitionIds(),
+          partitionInfo.getPartitionIds(),
           asyncMessageStoreThreads);
     }
 
@@ -201,9 +186,9 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   }
 
   @Override
-  public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+  public void initialize(PartitionSplitInfo<I> partitionInfo,
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-    this.service = service;
+    this.partitionInfo = partitionInfo;
     this.conf = conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 9c56d85..27e04ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.messages;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -89,6 +90,17 @@ public interface MessageStore<I extends WritableComparable,
       int partitionId, VertexIdMessages<I, M> messages);
 
   /**
+   * Adds a message for a particular vertex
+   * The method is used by InternalMessageStore to send local messages; for
+   * the general case, use a more efficient addPartitionMessages
+   *
+   * @param vertexId Id of target vertex
+   * @param message  A message to send
+   * @throws IOException
+   */
+  void addMessage(I vertexId, M message) throws IOException;
+
+  /**
    * Called before start of computation in bspworker
    * Since it is run from a single thread while the store is not being
    * accessed by any other thread - this is ensured to be thread-safe

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 6a18aa8..ee45f9a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.MessageClasses;
 import org.apache.hadoop.io.Writable;
@@ -45,9 +44,9 @@ public interface MessageStoreFactory<I extends WritableComparable,
    * Implementation class should use this method of initialization
    * of any required internal state.
    *
-   * @param service Service to get partition mappings
+   * @param partitionInfo Partition split info
    * @param conf Configuration
    */
-  void initialize(CentralizedServiceWorker<I, ?, ?> service,
+  void initialize(PartitionSplitInfo<I> partitionInfo,
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 1d67014..09534bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -31,6 +31,7 @@ import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -49,16 +50,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
 
   /**
    * @param messageValueFactory Message class held in the store
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner MessageCombiner for messages
    * @param config Hadoop configuration
    */
   public OneMessagePerVertexStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
-      MessageCombiner<? super I, M> messageCombiner,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    super(messageValueFactory, service, config);
+    MessageValueFactory<M> messageValueFactory,
+    PartitionSplitInfo<I> partitionInfo,
+    MessageCombiner<? super I, M> messageCombiner,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> config
+  ) {
+    super(messageValueFactory, partitionInfo, config);
     this.messageCombiner =
         messageCombiner;
   }
@@ -99,6 +101,25 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    ConcurrentMap<I, M> partitionMap =
+      getOrCreatePartitionMap(getPartitionId(vertexId));
+    M currentMessage = partitionMap.get(vertexId);
+    if (currentMessage == null) {
+      M newMessage = messageCombiner.createInitialMessage();
+      // need a copy as vertexId might be reusable
+      I copyId = WritableUtils.createCopy(vertexId);
+      currentMessage = partitionMap.putIfAbsent(copyId, newMessage);
+      if (currentMessage == null) {
+        currentMessage = newMessage;
+      }
+    }
+    synchronized (currentMessage) {
+      messageCombiner.combine(vertexId, currentMessage, message);
+    }
+  }
+
+  @Override
   protected Iterable<M> getMessagesAsIterable(M message) {
     return Collections.singleton(message);
   }
@@ -147,17 +168,19 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       M extends Writable>
       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
-    private CentralizedServiceWorker<I, ?, ?> service;
+    private PartitionSplitInfo<I> partitionInfo;
     /** Hadoop configuration */
     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
     /**
-     * @param service Worker service
+     * @param partitionInfo Partition split info
      * @param config  Hadoop configuration
      */
-    public Factory(CentralizedServiceWorker<I, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-      this.service = service;
+    public Factory(
+      PartitionSplitInfo<I> partitionInfo,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config
+    ) {
+      this.partitionInfo = partitionInfo;
       this.config = config;
     }
 
@@ -165,14 +188,14 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     public MessageStore<I, M> newStore(
         MessageClasses<I, M> messageClasses) {
       return new OneMessagePerVertexStore<I, M>(
-          messageClasses.createMessageValueFactory(config), service,
+          messageClasses.createMessageValueFactory(config), partitionInfo,
           messageClasses.createMessageCombiner(config), config);
     }
 
     @Override
-    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+    public void initialize(PartitionSplitInfo<I> partitionInfo,
         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-      this.service = service;
+      this.partitionInfo = partitionInfo;
       this.config = conf;
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java
new file mode 100644
index 0000000..e685825
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface providing partition split information.
+ *
+ * @param <I> Vertex id type.
+ */
+public interface PartitionSplitInfo<I extends WritableComparable> {
+  /**
+   * Get the partition id that a vertex id would belong to.
+   *
+   * @param vertexId Vertex id
+   * @return Partition id
+   */
+  int getPartitionId(I vertexId);
+
+  /**
+   * Get the ids of all the stored partitions (on current worker) as Iterable
+   *
+   * @return The partition ids
+   */
+  Iterable<Integer> getPartitionIds();
+
+  /**
+   * Return the number of vertices in a partition.
+   *
+   * @param partitionId Partition id
+   * @return The number of vertices in the specified partition
+   */
+  long getPartitionVertexCount(Integer partitionId);
+
+  /**
+   * {@link org.apache.giraph.partition.PartitionStore#startIteration()}
+   */
+  void startIteration();
+
+  /**
+   * {@link org.apache.giraph.partition.PartitionStore#getNextPartition()}
+   *
+   * @return The next partition to process
+   */
+  Partition getNextPartition();
+
+  /**
+   * {@link org.apache.giraph.partition.PartitionStore#putPartition(Partition)}
+   *
+   * @param partition Partition
+   */
+  void putPartition(Partition partition);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
index 4b32a17..429ff69 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
@@ -19,23 +19,22 @@
 package org.apache.giraph.comm.messages;
 
 import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
-
 /**
  * Implementation of {@link SimpleMessageStore} where multiple messages are
  * stored as a list of long pointers to extended data output objects
@@ -54,14 +53,15 @@ public class PointerListPerVertexStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Message class held in the store
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param config Hadoop configuration
    */
   public PointerListPerVertexStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    super(messageValueFactory, service, config);
+    MessageValueFactory<M> messageValueFactory,
+    PartitionSplitInfo<I> partitionInfo,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> config
+  ) {
+    super(messageValueFactory, partitionInfo, config);
     bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
   }
 
@@ -105,6 +105,21 @@ public class PointerListPerVertexStore<I extends WritableComparable,
     }
   }
 
+  @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    LongArrayList list = getOrCreateList(vertexId);
+    IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+    long pointer = indexAndDataOut.getIndex();
+    pointer <<= 32;
+    ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+    pointer += dataOutput.getPos();
+    message.write(dataOutput);
+
+    synchronized (list) {
+      list.add(pointer);
+    }
+  }
+
   /**
    * Get messages as an iterable from message storage
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 9c3ef7f..bb25067 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -20,14 +20,15 @@ package org.apache.giraph.comm.messages;
 
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -46,8 +47,8 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
     M extends Writable, T> implements MessageStore<I, M>  {
   /** Message class */
   protected final MessageValueFactory<M> messageValueFactory;
-  /** Service worker */
-  protected final CentralizedServiceWorker<I, ?, ?> service;
+  /** Partition split info */
+  protected final PartitionSplitInfo<I> partitionInfo;
   /** Map from partition id to map from vertex id to messages for that vertex */
   protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
   /** Giraph configuration */
@@ -57,15 +58,15 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Message class held in the store
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param config Giraph configuration
    */
   public SimpleMessageStore(
       MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
+      PartitionSplitInfo<I> partitionInfo,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     this.messageValueFactory = messageValueFactory;
-    this.service = service;
+    this.partitionInfo = partitionInfo;
     this.config = config;
     map = new MapMaker().concurrencyLevel(
         config.getNettyServerExecutionConcurrency()).makeMap();
@@ -114,7 +115,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
    * @return Id of partiton
    */
   protected int getPartitionId(I vertexId) {
-    return service.getVertexPartitionOwner(vertexId).getPartitionId();
+    return partitionInfo.getPartitionId(vertexId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index 57f3ff6..7c28102 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.comm.messages.primitives;
 
+import com.google.common.collect.Lists;
+
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 
 import java.io.DataInput;
@@ -25,9 +27,9 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
@@ -43,8 +45,6 @@ import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-
 /**
  * Special message store to be used when IDs are primitive and no combiner is
  * used.
@@ -60,8 +60,8 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
   protected final MessageValueFactory<M> messageValueFactory;
   /** Map from partition id to map from vertex id to message */
   private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, ?, ?> service;
+  /** Partition split info */
+  private final PartitionSplitInfo<I> partitionInfo;
   /** Giraph configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
   /** Vertex id TypeOps */
@@ -87,26 +87,27 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Factory for creating message values
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param config Hadoop configuration
    */
   public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    PartitionSplitInfo<I> partitionInfo,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> config
+  ) {
     this.messageValueFactory = messageValueFactory;
-    this.service = service;
+    this.partitionInfo = partitionInfo;
     this.config = config;
 
     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
 
     map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
+      int capacity = Math.max(10,
+        (int) partitionInfo.getPartitionVertexCount(partitionId));
       Basic2ObjectMap<I, DataInputOutput> partitionMap =
-          idTypeOps.create2ObjectOpenHashMap(
-              Math.max(10,
-                  (int) service.getPartitionStore()
-                      .getPartitionVertexCount(partitionId)),
-              dataInputOutputWriter);
+        idTypeOps.create2ObjectOpenHashMap(
+          capacity,
+          dataInputOutputWriter);
 
       map.put(partitionId, partitionMap);
     }
@@ -119,7 +120,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   /**
@@ -180,6 +181,25 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
     }
   }
 
+  /**
+   * Adds a message for a particular vertex
+   *
+   * @param vertexId Id of target vertex
+   * @param message  A message to send
+   * @throws IOException
+   */
+  @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    Basic2ObjectMap<I, DataInputOutput> partitionMap =
+      getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      DataInputOutput dataInputOutput = getDataInputOutput(
+        partitionMap, vertexId);
+      VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
+        vertexId, message, dataInputOutput.getDataOutput());
+    }
+  }
+
   @Override
   public void clearPartition(int partitionId) {
     map.get(partitionId).clear();

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index 4463ddb..415eafc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.comm.messages.primitives;
 
+import com.google.common.collect.Lists;
+
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 
 import java.io.DataInput;
@@ -26,9 +28,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
@@ -41,8 +43,6 @@ import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-
 /**
  * Special message store to be used when IDs are primitive and message doesn't
  * need to be, and message combiner is used.
@@ -62,8 +62,8 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   private final MessageValueFactory<M> messageValueFactory;
   /** Message messageCombiner */
   private final MessageCombiner<? super I, M> messageCombiner;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, ?, ?> service;
+  /** Partition split info */
+  private final PartitionSplitInfo<I> partitionInfo;
   /** Giraph configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
   /** Vertex id TypeOps */
@@ -87,16 +87,16 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
    * Constructor
    *
    * @param messageValueFactory Message value factory
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner Message messageCombiner
    * @param config Config
    */
   public IdOneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<I, ?, ?> service,
+      PartitionSplitInfo<I> partitionInfo,
       MessageCombiner<? super I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    this.service = service;
+    this.partitionInfo = partitionInfo;
     this.config = config;
     this.messageValueFactory = messageValueFactory;
     this.messageCombiner = messageCombiner;
@@ -104,10 +104,11 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
 
     map = new Int2ObjectOpenHashMap<>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
-          Math.max(10, (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId)), messageWriter);
+        Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)),
+        messageWriter
+      );
       map.put(partitionId, partitionMap);
     }
   }
@@ -119,7 +120,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   @Override
@@ -151,6 +152,29 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
     }
   }
 
+  /**
+   * Adds a message for a particular vertex
+   *
+   * @param vertexId Id of target vertex
+   * @param message  A message to send
+   * @throws IOException
+   */
+  @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      M currentMessage = partitionMap.get(vertexId);
+      if (currentMessage == null) {
+        M newMessage = messageCombiner.createInitialMessage();
+        currentMessage = partitionMap.put(vertexId, newMessage);
+        if (currentMessage == null) {
+          currentMessage = newMessage;
+        }
+      }
+      messageCombiner.combine(vertexId, currentMessage, message);
+    }
+  }
+
   @Override
   public void clearPartition(int partitionId) {
     map.get(partitionId).clear();

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
deleted file mode 100644
index 4ef9e76..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntIterator;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Special message store to be used when ids are IntWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class IntByteArrayMessageStore<M extends Writable>
-    implements MessageStore<IntWritable, M> {
-  /** Message value factory */
-  protected final MessageValueFactory<M> messageValueFactory;
-  /** Map from partition id to map from vertex id to message */
-  private final
-  Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>> map;
-  /** Service worker */
-  private final CentralizedServiceWorker<IntWritable, ?, ?> service;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service      Service worker
-   * @param config       Hadoop configuration
-   */
-  public IntByteArrayMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<IntWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<IntWritable, Writable, Writable>
-        config) {
-    this.messageValueFactory = messageValueFactory;
-    this.service = service;
-    this.config = config;
-
-    map =
-        new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
-          new Int2ObjectOpenHashMap<DataInputOutput>(
-              (int) service.getPartitionStore()
-                  .getPartitionVertexCount(partitionId));
-      map.put(partitionId, partitionMap);
-    }
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return false;
-  }
-
-  /**
-   * Get map which holds messages for partition which vertex belongs to.
-   *
-   * @param vertexId Id of the vertex
-   * @return Map which holds messages for partition which vertex belongs to.
-   */
-  private Int2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
-      IntWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
-  }
-
-  /**
-   * Get the DataInputOutput for a vertex id, creating if necessary.
-   *
-   * @param partitionMap Partition map to look in
-   * @param vertexId     Id of the vertex
-   * @return DataInputOutput for this vertex id (created if necessary)
-   */
-  private DataInputOutput getDataInputOutput(
-      Int2ObjectOpenHashMap<DataInputOutput> partitionMap,
-      int vertexId) {
-    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
-    if (dataInputOutput == null) {
-      dataInputOutput = config.createMessagesInputOutput();
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    return dataInputOutput;
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-      VertexIdMessages<IntWritable, M> messages) {
-    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    synchronized (partitionMap) {
-      VertexIdMessageBytesIterator<IntWritable, M>
-          vertexIdMessageBytesIterator =
-          messages.getVertexIdMessageBytesIterator();
-      // Try to copy the message buffer over rather than
-      // doing a deserialization of a message just to know its size.  This
-      // should be more efficient for complex objects where serialization is
-      // expensive.  If this type of iterator is not available, fall back to
-      // deserializing/serializing the messages
-      if (vertexIdMessageBytesIterator != null) {
-        while (vertexIdMessageBytesIterator.hasNext()) {
-          vertexIdMessageBytesIterator.next();
-          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-              vertexIdMessageBytesIterator.getCurrentVertexId().get());
-          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              dataInputOutput.getDataOutput());
-        }
-      } else {
-        try {
-          VertexIdMessageIterator<IntWritable, M>
-              iterator = messages.getVertexIdMessageIterator();
-          while (iterator.hasNext()) {
-            iterator.next();
-            DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-                iterator.getCurrentVertexId().get());
-            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-                dataInputOutput.getDataOutput());
-          }
-        } catch (IOException e) {
-          throw new RuntimeException("addPartitionMessages: IOException while" +
-              " adding messages for a partition: " + e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void finalizeStore() {
-  }
-
-  @Override
-  public void clearPartition(int partitionId) {
-    map.get(partitionId).clear();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(IntWritable vertexId) {
-    return getPartitionMap(vertexId).containsKey(vertexId.get());
-  }
-
-  @Override
-  public boolean hasMessagesForPartition(int partitionId) {
-    Int2ObjectOpenHashMap<DataInputOutput> partitionMessages =
-        map.get(partitionId);
-    return partitionMessages != null && !partitionMessages.isEmpty();
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-      IntWritable vertexId) {
-    DataInputOutput dataInputOutput =
-        getPartitionMap(vertexId).get(vertexId.get());
-    if (dataInputOutput == null) {
-      return EmptyIterable.get();
-    } else {
-      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
-    }
-  }
-
-  @Override
-  public void clearVertexMessages(IntWritable vertexId) {
-    getPartitionMap(vertexId).remove(vertexId.get());
-  }
-
-  @Override
-  public void clearAll() {
-    map.clear();
-  }
-
-  @Override
-  public Iterable<IntWritable> getPartitionDestinationVertices(
-      int partitionId) {
-    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    List<IntWritable> vertices =
-        Lists.newArrayListWithCapacity(partitionMap.size());
-    IntIterator iterator = partitionMap.keySet().iterator();
-    while (iterator.hasNext()) {
-      vertices.add(new IntWritable(iterator.nextInt()));
-    }
-    return vertices;
-  }
-
-  @Override
-  public void writePartition(DataOutput out,
-      int partitionId) throws IOException {
-    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    out.writeInt(partitionMap.size());
-    ObjectIterator<Int2ObjectMap.Entry<DataInputOutput>> iterator =
-        partitionMap.int2ObjectEntrySet().fastIterator();
-    while (iterator.hasNext()) {
-      Int2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
-      out.writeInt(entry.getIntKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException {
-    int size = in.readInt();
-    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        new Int2ObjectOpenHashMap<DataInputOutput>(size);
-    while (size-- > 0) {
-      int vertexId = in.readInt();
-      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
-      dataInputOutput.readFields(in);
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    synchronized (map) {
-      map.put(partitionId, partitionMap);
-    }
-  }
-}