You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/08/31 17:23:39 UTC

[2/3] git commit: updated refs/heads/trunk to 2117d1d

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 715bf45..77b2689 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.messages.primitives;
 
+import com.google.common.collect.Lists;
+
 import it.unimi.dsi.fastutil.ints.Int2FloatMap;
 import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -30,17 +32,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
 
 /**
  * Special message store to be used when ids are IntWritable and messages
@@ -55,26 +54,26 @@ public class IntFloatMessageStore
   /** Message messageCombiner */
   private final
   MessageCombiner<? super IntWritable, FloatWritable> messageCombiner;
-  /** Service worker */
-  private final CentralizedServiceWorker<IntWritable, ?, ?> service;
+  /** Partition split info */
+  private final PartitionSplitInfo<IntWritable> partitionInfo;
 
   /**
    * Constructor
    *
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner Message messageCombiner
    */
   public IntFloatMessageStore(
-      CentralizedServiceWorker<IntWritable, Writable, Writable> service,
-      MessageCombiner<? super IntWritable, FloatWritable> messageCombiner) {
-    this.service = service;
+    PartitionSplitInfo<IntWritable> partitionInfo,
+    MessageCombiner<? super IntWritable, FloatWritable> messageCombiner
+  ) {
+    this.partitionInfo = partitionInfo;
     this.messageCombiner = messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
       Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
     }
   }
@@ -91,7 +90,7 @@ public class IntFloatMessageStore
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   @Override
@@ -117,12 +116,27 @@ public class IntFloatMessageStore
               reusableMessage);
           message = reusableCurrentMessage.get();
         }
+        // FIXME: messageCombiner should create an initial message instead
         partitionMap.put(vertexId, message);
       }
     }
   }
 
   @Override
+  public void addMessage(
+    IntWritable vertexId,
+    FloatWritable message
+  ) throws IOException {
+    Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      float originalValue = partitionMap.get(vertexId.get());
+      FloatWritable originalMessage = new FloatWritable(originalValue);
+      messageCombiner.combine(vertexId, originalMessage, message);
+      partitionMap.put(vertexId.get(), originalMessage.get());
+    }
+  }
+
+  @Override
   public void finalizeStore() {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 4fc4843..19aede4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -30,15 +30,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.collect.Lists;
 
@@ -56,26 +55,25 @@ public class LongDoubleMessageStore
   private final
   MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
   /** Service worker */
-  private final CentralizedServiceWorker<LongWritable, ?, ?> service;
+  private final PartitionSplitInfo<LongWritable> partitionInfo;
 
   /**
    * Constructor
    *
-   * @param service Service worker
+   * @param partitionInfo Partition split info
    * @param messageCombiner Message messageCombiner
    */
   public LongDoubleMessageStore(
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner) {
-    this.service = service;
-    this.messageCombiner =
-        messageCombiner;
+    PartitionSplitInfo<LongWritable> partitionInfo,
+    MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
+  ) {
+    this.partitionInfo = partitionInfo;
+    this.messageCombiner = messageCombiner;
 
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+    for (int partitionId : partitionInfo.getPartitionIds()) {
       Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
     }
   }
@@ -92,7 +90,7 @@ public class LongDoubleMessageStore
    * @return Map which holds messages for partition which vertex belongs to.
    */
   private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
+    return map.get(partitionInfo.getPartitionId(vertexId));
   }
 
   @Override
@@ -118,12 +116,27 @@ public class LongDoubleMessageStore
               reusableMessage);
           message = reusableCurrentMessage.get();
         }
+        // FIXME: messageCombiner should create an initial message instead
         partitionMap.put(vertexId, message);
       }
     }
   }
 
   @Override
+  public void addMessage(
+    LongWritable vertexId,
+    DoubleWritable message
+  ) throws IOException {
+    Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
+    synchronized (partitionMap) {
+      double originalValue = partitionMap.get(vertexId.get());
+      DoubleWritable originalMessage = new DoubleWritable(originalValue);
+      messageCombiner.combine(vertexId, originalMessage, message);
+      partitionMap.put(vertexId.get(), originalMessage.get());
+    }
+  }
+
+  @Override
   public void finalizeStore() {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
deleted file mode 100644
index d1c33be..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.VertexIdIterator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <L> list type
- */
-public abstract class LongAbstractListMessageStore<M extends Writable,
-  L extends List> extends LongAbstractMessageStore<M, L> {
-  /**
-   * Map used to store messages for nascent vertices i.e., ones
-   * that did not exist at the start of current superstep but will get
-   * created because of sending message to a non-existent vertex id
-   */
-  private final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongAbstractListMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable,
-          Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-    populateMap();
-
-    // create map for vertex ids (i.e., nascent vertices) not known yet
-    nascentMap = new Int2ObjectOpenHashMap<>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
-    }
-  }
-
-  /**
-   * Populate the map with all vertexIds for each partition
-   */
-  private void populateMap() { // TODO - can parallelize?
-    // populate with vertex ids already known
-    service.getPartitionStore().startIteration();
-    while (true) {
-      Partition partition = service.getPartitionStore().getNextPartition();
-      if (partition == null) {
-        break;
-      }
-      Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
-      for (Object obj : partition) {
-        Vertex vertex = (Vertex) obj;
-        LongWritable vertexId = (LongWritable) vertex.getId();
-        partitionMap.put(vertexId.get(), createList());
-      }
-      service.getPartitionStore().putPartition(partition);
-    }
-  }
-
-  /**
-   * Create an instance of L
-   * @return instance of L
-   */
-  protected abstract L createList();
-
-  /**
-   * Get list for the current vertexId
-   *
-   * @param iterator vertexId iterator
-   * @return list for current vertexId
-   */
-  protected L getList(
-    VertexIdIterator<LongWritable> iterator) {
-    PartitionOwner owner =
-        service.getVertexPartitionOwner(iterator.getCurrentVertexId());
-    long vertexId = iterator.getCurrentVertexId().get();
-    int partitionId = owner.getPartitionId();
-    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
-    if (!partitionMap.containsKey(vertexId)) {
-      synchronized (nascentMap) {
-        // assumption: not many nascent vertices are created
-        // so overall synchronization is negligible
-        Long2ObjectOpenHashMap<L> nascentPartitionMap =
-          nascentMap.get(partitionId);
-        if (nascentPartitionMap.get(vertexId) == null) {
-          nascentPartitionMap.put(vertexId, createList());
-        }
-        return nascentPartitionMap.get(vertexId);
-      }
-    }
-    return partitionMap.get(vertexId);
-  }
-
-  @Override
-  public void finalizeStore() {
-    for (int partitionId : nascentMap.keySet()) {
-      // nascent vertices are present only in nascent map
-      map.get(partitionId).putAll(nascentMap.get(partitionId));
-    }
-    nascentMap.clear();
-  }
-
-  // TODO - discussion
-  /*
-  some approaches for ensuring correctness with parallel inserts
-  - current approach: uses a small extra bit of memory by pre-populating
-  map & pushes everything map cannot handle to nascentMap
-  at the beginning of next superstep compute a single threaded finalizeStore is
-  called (so little extra memory + 1 sequential finish ops)
-  - used striped parallel fast utils instead (unsure of perf)
-  - use concurrent map (every get gets far slower)
-  - use reader writer locks (unsure of perf)
-  (code looks something like underneath)
-
-      private final ReadWriteLock rwl = new ReentrantReadWriteLock();
-      rwl.readLock().lock();
-      L list = partitionMap.get(vertexId);
-      if (list == null) {
-        rwl.readLock().unlock();
-        rwl.writeLock().lock();
-        if (partitionMap.get(vertexId) == null) {
-          list = createList();
-          partitionMap.put(vertexId, list);
-        }
-        rwl.readLock().lock();
-        rwl.writeLock().unlock();
-      }
-      rwl.readLock().unlock();
-  - adopted from the article
-    http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
-    ReentrantReadWriteLock.html
-   */
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
new file mode 100644
index 0000000..aee6a61
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <L> list type
+ */
+public abstract class LongAbstractListStore<M extends Writable,
+  L extends List> extends LongAbstractStore<M, L> {
+  /**
+   * Map used to store messages for nascent vertices i.e., ones
+   * that did not exist at the start of current superstep but will get
+   * created because of sending message to a non-existent vertex id
+   */
+  private final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongAbstractListStore(
+      MessageValueFactory<M> messageValueFactory,
+      PartitionSplitInfo<LongWritable> partitionInfo,
+      ImmutableClassesGiraphConfiguration<LongWritable,
+          Writable, Writable> config) {
+    super(messageValueFactory, partitionInfo, config);
+    populateMap();
+
+    // create map for vertex ids (i.e., nascent vertices) not known yet
+    nascentMap = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : partitionInfo.getPartitionIds()) {
+      nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
+    }
+  }
+
+  /**
+   * Populate the map with all vertexIds for each partition
+   */
+  private void populateMap() { // TODO - can parallelize?
+    // populate with vertex ids already known
+    partitionInfo.startIteration();
+    while (true) {
+      Partition partition = partitionInfo.getNextPartition();
+      if (partition == null) {
+        break;
+      }
+      Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
+      for (Object obj : partition) {
+        Vertex vertex = (Vertex) obj;
+        LongWritable vertexId = (LongWritable) vertex.getId();
+        partitionMap.put(vertexId.get(), createList());
+      }
+      partitionInfo.putPartition(partition);
+    }
+  }
+
+  /**
+   * Create an instance of L
+   * @return instance of L
+   */
+  protected abstract L createList();
+
+  /**
+   * Get list for the current vertexId
+   *
+   * @param vertexId vertex id
+   * @return list for current vertexId
+   */
+  protected L getList(LongWritable vertexId) {
+    long id = vertexId.get();
+    int partitionId = partitionInfo.getPartitionId(vertexId);
+    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+    L list = partitionMap.get(id);
+    if (list == null) {
+      Long2ObjectOpenHashMap<L> nascentPartitionMap =
+        nascentMap.get(partitionId);
+      // assumption: not many nascent vertices are created
+      // so overall synchronization is negligible
+      synchronized (nascentPartitionMap) {
+        list = nascentPartitionMap.get(id);
+        if (list == null) {
+          list = createList();
+          nascentPartitionMap.put(id, list);
+        }
+        return list;
+      }
+    }
+    return list;
+  }
+
+  @Override
+  public void finalizeStore() {
+    for (int partitionId : nascentMap.keySet()) {
+      // nascent vertices are present only in nascent map
+      map.get(partitionId).putAll(nascentMap.get(partitionId));
+    }
+    nascentMap.clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    int partitionId = partitionInfo.getPartitionId(vertexId);
+    Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+    L list = partitionMap.get(vertexId.get());
+    if (list != null && !list.isEmpty()) {
+      return true;
+    }
+    Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
+    return nascentMessages != null &&
+           nascentMessages.containsKey(vertexId.get());
+  }
+
+  // TODO - discussion
+  /*
+  some approaches for ensuring correctness with parallel inserts
+  - current approach: uses a small extra bit of memory by pre-populating
+  map & pushes everything map cannot handle to nascentMap
+  at the beginning of next superstep compute a single threaded finalizeStore is
+  called (so little extra memory + 1 sequential finish ops)
+  - used striped parallel fast utils instead (unsure of perf)
+  - use concurrent map (every get gets far slower)
+  - use reader writer locks (unsure of perf)
+  (code looks something like underneath)
+
+      private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+      rwl.readLock().lock();
+      L list = partitionMap.get(vertexId);
+      if (list == null) {
+        rwl.readLock().unlock();
+        rwl.writeLock().lock();
+        if (partitionMap.get(vertexId) == null) {
+          list = createList();
+          partitionMap.put(vertexId, list);
+        }
+        rwl.readLock().lock();
+        rwl.writeLock().unlock();
+      }
+      rwl.readLock().unlock();
+  - adopted from the article
+    http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
+    ReentrantReadWriteLock.html
+   */
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
deleted file mode 100644
index b3ed4b2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import com.google.common.collect.Lists;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.LongIterator;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <T> datastructure used to hold messages
- */
-public abstract class LongAbstractMessageStore<M extends Writable, T>
-  implements MessageStore<LongWritable, M> {
-  /** Message value factory */
-  protected final MessageValueFactory<M> messageValueFactory;
-  /** Map from partition id to map from vertex id to message */
-  protected final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
-  /** Service worker */
-  protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
-  /** Giraph configuration */
-  protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
-  config;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service      Service worker
-   * @param config       Hadoop configuration
-   */
-  public LongAbstractMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
-          config) {
-    this.messageValueFactory = messageValueFactory;
-    this.service = service;
-    this.config = config;
-
-    map = new Int2ObjectOpenHashMap<>();
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
-          (int) service.getPartitionStore()
-              .getPartitionVertexCount(partitionId));
-      map.put(partitionId, partitionMap);
-    }
-  }
-
-  /**
-   * Get map which holds messages for partition which vertex belongs to.
-   *
-   * @param vertexId Id of the vertex
-   * @return Map which holds messages for partition which vertex belongs to.
-   */
-  protected Long2ObjectOpenHashMap<T> getPartitionMap(
-      LongWritable vertexId) {
-    return map.get(service.getPartitionId(vertexId));
-  }
-
-  @Override
-  public void clearPartition(int partitionId) {
-    map.get(partitionId).clear();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(LongWritable vertexId) {
-    return getPartitionMap(vertexId).containsKey(vertexId.get());
-  }
-
-  @Override
-  public boolean hasMessagesForPartition(int partitionId) {
-    Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
-    return partitionMessages != null && !partitionMessages.isEmpty();
-  }
-
-  @Override
-  public void clearVertexMessages(LongWritable vertexId) {
-    getPartitionMap(vertexId).remove(vertexId.get());
-  }
-
-
-  @Override
-  public void clearAll() {
-    map.clear();
-  }
-
-  @Override
-  public Iterable<LongWritable> getPartitionDestinationVertices(
-      int partitionId) {
-    Long2ObjectOpenHashMap<T> partitionMap =
-        map.get(partitionId);
-    List<LongWritable> vertices =
-        Lists.newArrayListWithCapacity(partitionMap.size());
-    LongIterator iterator = partitionMap.keySet().iterator();
-    while (iterator.hasNext()) {
-      vertices.add(new LongWritable(iterator.nextLong()));
-    }
-    return vertices;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
new file mode 100644
index 0000000..385388d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <T> datastructure used to hold messages
+ */
+public abstract class LongAbstractStore<M extends Writable, T>
+  implements MessageStore<LongWritable, M> {
+  /** Message value factory */
+  protected final MessageValueFactory<M> messageValueFactory;
+  /** Map from partition id to map from vertex id to message */
+  protected final
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
+  /** Service worker */
+  protected final PartitionSplitInfo<LongWritable> partitionInfo;
+  /** Giraph configuration */
+  protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+  config;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongAbstractStore(
+      MessageValueFactory<M> messageValueFactory,
+      PartitionSplitInfo<LongWritable> partitionInfo,
+      ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
+          config) {
+    this.messageValueFactory = messageValueFactory;
+    this.partitionInfo = partitionInfo;
+    this.config = config;
+
+    map = new Int2ObjectOpenHashMap<>();
+    for (int partitionId : partitionInfo.getPartitionIds()) {
+      Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
+          (int) partitionInfo.getPartitionVertexCount(partitionId));
+      map.put(partitionId, partitionMap);
+    }
+  }
+
+  /**
+   * Get map which holds messages for partition which vertex belongs to.
+   *
+   * @param vertexId Id of the vertex
+   * @return Map which holds messages for partition which vertex belongs to.
+   */
+  protected Long2ObjectOpenHashMap<T> getPartitionMap(
+      LongWritable vertexId) {
+    return map.get(partitionInfo.getPartitionId(vertexId));
+  }
+
+  @Override
+  public void clearPartition(int partitionId) {
+    map.get(partitionId).clear();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(LongWritable vertexId) {
+    return getPartitionMap(vertexId).containsKey(vertexId.get());
+  }
+
+  @Override
+  public boolean hasMessagesForPartition(int partitionId) {
+    Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
+  }
+
+  @Override
+  public void clearVertexMessages(LongWritable vertexId) {
+    getPartitionMap(vertexId).remove(vertexId.get());
+  }
+
+
+  @Override
+  public void clearAll() {
+    map.clear();
+  }
+
+  @Override
+  public Iterable<LongWritable> getPartitionDestinationVertices(
+      int partitionId) {
+    Long2ObjectOpenHashMap<T> partitionMap =
+        map.get(partitionId);
+    List<LongWritable> vertices =
+        Lists.newArrayListWithCapacity(partitionMap.size());
+    LongIterator iterator = partitionMap.keySet().iterator();
+    while (iterator.hasNext()) {
+      vertices.add(new LongWritable(iterator.nextLong()));
+    }
+    return vertices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
deleted file mode 100644
index bcdab98..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class LongByteArrayMessageStore<M extends Writable>
-  extends LongAbstractMessageStore<M, DataInputOutput> {
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongByteArrayMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-      ImmutableClassesGiraphConfiguration<LongWritable,
-          Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return false;
-  }
-
-  /**
-   * Get the DataInputOutput for a vertex id, creating if necessary.
-   *
-   * @param partitionMap Partition map to look in
-   * @param vertexId Id of the vertex
-   * @return DataInputOutput for this vertex id (created if necessary)
-   */
-  private DataInputOutput getDataInputOutput(
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
-    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
-    if (dataInputOutput == null) {
-      dataInputOutput = config.createMessagesInputOutput();
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    return dataInputOutput;
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
-    synchronized (partitionMap) {
-      VertexIdMessageBytesIterator<LongWritable, M>
-          vertexIdMessageBytesIterator =
-          messages.getVertexIdMessageBytesIterator();
-      // Try to copy the message buffer over rather than
-      // doing a deserialization of a message just to know its size.  This
-      // should be more efficient for complex objects where serialization is
-      // expensive.  If this type of iterator is not available, fall back to
-      // deserializing/serializing the messages
-      if (vertexIdMessageBytesIterator != null) {
-        while (vertexIdMessageBytesIterator.hasNext()) {
-          vertexIdMessageBytesIterator.next();
-          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-              vertexIdMessageBytesIterator.getCurrentVertexId().get());
-          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              dataInputOutput.getDataOutput());
-        }
-      } else {
-        try {
-          VertexIdMessageIterator<LongWritable, M>
-              iterator = messages.getVertexIdMessageIterator();
-          while (iterator.hasNext()) {
-            iterator.next();
-            DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
-                iterator.getCurrentVertexId().get());
-            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-                dataInputOutput.getDataOutput());
-          }
-        } catch (IOException e) {
-          throw new RuntimeException("addPartitionMessages: IOException while" +
-              " adding messages for a partition: " + e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void finalizeStore() {
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-    LongWritable vertexId) {
-    DataInputOutput dataInputOutput =
-        getPartitionMap(vertexId).get(vertexId.get());
-    if (dataInputOutput == null) {
-      return EmptyIterable.get();
-    } else {
-      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
-    }
-  }
-
-  @Override
-  public void writePartition(DataOutput out, int partitionId)
-    throws IOException {
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        map.get(partitionId);
-    out.writeInt(partitionMap.size());
-    ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
-        partitionMap.long2ObjectEntrySet().fastIterator();
-    while (iterator.hasNext()) {
-      Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
-      out.writeLong(entry.getLongKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-    int partitionId) throws IOException {
-    int size = in.readInt();
-    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
-        new Long2ObjectOpenHashMap<DataInputOutput>(size);
-    while (size-- > 0) {
-      long vertexId = in.readLong();
-      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
-      dataInputOutput.readFields(in);
-      partitionMap.put(vertexId, dataInputOutput);
-    }
-    synchronized (map) {
-      map.put(partitionId, partitionMap);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
deleted file mode 100644
index eef75ba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.PointerListMessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
-
-/**
- * This stores messages in
- * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
- * and stores long pointers that point to serialized messages
- *
- * @param <M> message type
- */
-public class LongPointerListMessageStore<M extends Writable>
-  extends LongAbstractListMessageStore<M, LongArrayList>
-  implements MessageStore<LongWritable, M> {
-
-  /** Buffers of byte array outputs used to store messages - thread safe */
-  private final ExtendedByteArrayOutputBuffer bytesBuffer;
-
-  /**
-   * Constructor
-   *
-   * @param messageValueFactory Factory for creating message values
-   * @param service             Service worker
-   * @param config              Hadoop configuration
-   */
-  public LongPointerListMessageStore(
-    MessageValueFactory<M> messageValueFactory,
-    CentralizedServiceWorker<LongWritable, Writable, Writable> service,
-    ImmutableClassesGiraphConfiguration<LongWritable,
-    Writable, Writable> config) {
-    super(messageValueFactory, service, config);
-    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return true;
-  }
-
-  @Override
-  protected LongArrayList createList() {
-    return new LongArrayList();
-  }
-
-  @Override
-  public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) {
-    try {
-      VertexIdMessageIterator<LongWritable, M> iterator =
-          messages.getVertexIdMessageIterator();
-      long pointer = 0;
-      LongArrayList list;
-      while (iterator.hasNext()) {
-        iterator.next();
-        M msg = iterator.getCurrentMessage();
-        list = getList(iterator);
-        if (iterator.isNewMessage()) {
-          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
-          pointer = indexAndDataOut.getIndex();
-          pointer <<= 32;
-          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
-          pointer += dataOutput.getPos();
-          msg.write(dataOutput);
-        }
-        synchronized (list) { // TODO - any better way?
-          list.add(pointer);
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("addPartitionMessages: IOException while" +
-          " adding messages for a partition: " + e);
-    }
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(
-    LongWritable vertexId) {
-    LongArrayList list = getPartitionMap(vertexId).get(
-        vertexId.get());
-    if (list == null) {
-      return EmptyIterable.get();
-    } else {
-      return new PointerListMessagesIterable<>(messageValueFactory,
-        list, bytesBuffer);
-    }
-  }
-
-  // FIXME -- complete these for check-pointing
-  @Override
-  public void writePartition(DataOutput out, int partitionId)
-    throws IOException {
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in, int partitionId)
-    throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
new file mode 100644
index 0000000..525225c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.comm.messages.PointerListMessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This stores messages in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ * and stores long pointers that point to serialized messages
+ *
+ * @param <M> message type
+ */
+public class LongPointerListPerVertexStore<M extends Writable>
+  extends LongAbstractListStore<M, LongArrayList>
+  implements MessageStore<LongWritable, M> {
+
+  /** Buffers of byte array outputs used to store messages - thread safe */
+  private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param partitionInfo       Partition split info
+   * @param config              Hadoop configuration
+   */
+  public LongPointerListPerVertexStore(
+    MessageValueFactory<M> messageValueFactory,
+    PartitionSplitInfo<LongWritable> partitionInfo,
+    ImmutableClassesGiraphConfiguration<LongWritable,
+    Writable, Writable> config) {
+    super(messageValueFactory, partitionInfo, config);
+    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return true;
+  }
+
+  @Override
+  protected LongArrayList createList() {
+    return new LongArrayList();
+  }
+
+  @Override
+  public void addPartitionMessages(
+    int partitionId,
+    VertexIdMessages<LongWritable, M> messages
+  ) {
+    try {
+      VertexIdMessageIterator<LongWritable, M> iterator =
+          messages.getVertexIdMessageIterator();
+      long pointer = 0;
+      LongArrayList list;
+      while (iterator.hasNext()) {
+        iterator.next();
+        M msg = iterator.getCurrentMessage();
+        list = getList(iterator.getCurrentVertexId());
+
+        if (iterator.isNewMessage()) {
+          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+          pointer = indexAndDataOut.getIndex();
+          pointer <<= 32;
+          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+          pointer += dataOutput.getPos();
+          msg.write(dataOutput);
+        }
+        synchronized (list) { // TODO - any better way?
+          list.add(pointer);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("addPartitionMessages: IOException while" +
+          " adding messages for a partition: " + e);
+    }
+  }
+
+  @Override
+  public void addMessage(LongWritable vertexId, M message) throws IOException {
+    LongArrayList list = getList(vertexId);
+    IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+    long pointer = indexAndDataOut.getIndex();
+    pointer <<= 32;
+    ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+    pointer += dataOutput.getPos();
+    message.write(dataOutput);
+
+    synchronized (list) {
+      list.add(pointer);
+    }
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(LongWritable vertexId) {
+    LongArrayList list = getPartitionMap(vertexId).get(
+        vertexId.get());
+    if (list == null) {
+      return EmptyIterable.get();
+    } else {
+      return new PointerListMessagesIterable<>(messageValueFactory,
+        list, bytesBuffer);
+    }
+  }
+
+  // FIXME -- complete these for check-pointing
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+    throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 6273694..e8f00ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -19,12 +19,6 @@ package org.apache.giraph.comm.messages.queue;
 
 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -35,6 +29,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
 /**
  * This class decouples message receiving and processing
  * into separate threads thus reducing contention.
@@ -157,6 +158,12 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   }
 
   @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    // TODO: implement if LocalBlockRunner needs async message store
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void finalizeStore() {
     store.finalizeStore();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index c8d0f79..de67af4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.ooc.data;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
@@ -32,9 +36,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 
 /**
  * Implementation of a message store used for out-of-core mechanism.
@@ -137,6 +138,26 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     }
   }
 
+  @Override
+  public void addMessage(I vertexId, M message) throws IOException {
+    if (useMessageCombiner) {
+      messageStore.addMessage(vertexId, message);
+    } else {
+      // TODO: implement if LocalBlockRunner needs this message store
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * Gets the path that should be used specifically for message data.
+   *
+   * @param basePath path prefix to build the actual path from
+   * @param superstep superstep for which message data should be stored
+   * @return path to files specific for message data
+   */
+  private static String getPath(String basePath, long superstep) {
+    return basePath + "_messages-S" + superstep;
+  }
 
   @Override
   public long loadPartitionData(int partitionId)

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
index 299dced..8f00293 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
@@ -23,23 +23,29 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.giraph.types.ops.IntTypeOps;
 import org.apache.giraph.types.ops.LongTypeOps;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Basic2ObjectMap with only basic set of operations.
- * All operations that return object T are returning reusable object,
+ * All operations that return object K are returning reusable object,
  * which is modified after calling any other function.
  *
  * @param <K> Key type
@@ -81,13 +87,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
    * @return the old value, or null if no value was present for the given key.
    */
   public abstract V remove(K key);
-
   /**
    * TypeOps for type of keys this object holds
    * @return TypeOps
    */
   public abstract PrimitiveIdTypeOps<K> getKeyTypeOps();
-
   /**
    * Fast iterator over keys within this map, which doesn't allocate new
    * element for each returned element.
@@ -100,6 +104,20 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
   public abstract Iterator<K> fastKeyIterator();
 
   /**
+   * Iterator over map values.
+   *
+   * @return Iterator
+   */
+  public abstract Iterator<V> valueIterator();
+
+  /**
+   * A collection of all values.
+   *
+   * @return Iterator
+   */
+  public abstract Collection<V> values();
+
+  /**
    * Iterator that reuses key object.
    *
    * @param <Iter> Primitive key iterator type
@@ -212,6 +230,16 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
     }
 
     @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
       out.writeInt(map.size());
       ObjectIterator<Int2ObjectMap.Entry<V>> iterator =
@@ -319,7 +347,22 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
     }
 
     @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
       out.writeInt(map.size());
       ObjectIterator<Long2ObjectMap.Entry<V>> iterator =
           map.long2ObjectEntrySet().fastIterator();
@@ -332,6 +375,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
 
     @Override
     public void readFields(DataInput in) throws IOException {
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
       int size = in.readInt();
       map.clear();
       map.trim(size);
@@ -342,4 +390,141 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
       }
     }
   }
+
+  /** Writable implementation of Basic2ObjectMap */
+  public static final class BasicObject2ObjectOpenHashMap<K extends Writable, V>
+      extends Basic2ObjectMap<K, V> {
+    /** Map */
+    private final Object2ObjectOpenHashMap<K, V> map;
+    /** Key writer */
+    private final WritableWriter<K> keyWriter;
+    /** Value writer */
+    private final WritableWriter<V> valueWriter;
+
+    /**
+     * Constructor
+     *
+     * @param keyWriter Writer of keys
+     * @param valueWriter Writer of values
+     */
+    public BasicObject2ObjectOpenHashMap(
+      WritableWriter<K> keyWriter,
+      WritableWriter<V> valueWriter
+    ) {
+      this.map = new Object2ObjectOpenHashMap<>();
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    /**
+     * Constructor
+     *
+     * @param capacity Map capacity
+     * @param keyWriter Writer of keys
+     * @param valueWriter Writer of values
+     */
+    public BasicObject2ObjectOpenHashMap(
+      int capacity,
+      WritableWriter<K> keyWriter,
+      WritableWriter<V> valueWriter
+    ) {
+      this.map = new Object2ObjectOpenHashMap<>(capacity);
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    @Override
+    public void clear() {
+      map.clear();
+    }
+
+    @Override
+    public int size() {
+      return map.size();
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+      return map.containsKey(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+      // we need a copy since the key object is mutable
+      K copyKey = WritableUtils.createCopy(key);
+      return map.put(copyKey, value);
+    }
+
+    @Override
+    public V get(K key) {
+      return map.get(key);
+    }
+
+    @Override
+    public V remove(K key) {
+      return map.remove(key);
+    }
+
+    @Override
+    public PrimitiveIdTypeOps<K> getKeyTypeOps() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<K> fastKeyIterator() {
+      return map.keySet().iterator();
+    }
+
+    @Override
+    public Iterator<V> valueIterator() {
+      return map.values().iterator();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return map.values();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Preconditions.checkState(
+        keyWriter != null,
+        "keyWriter is not provided"
+      );
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
+      out.writeInt(map.size());
+      ObjectIterator<Object2ObjectMap.Entry<K, V>> iterator =
+          map.object2ObjectEntrySet().fastIterator();
+      while (iterator.hasNext()) {
+        Object2ObjectMap.Entry<K, V> entry = iterator.next();
+        keyWriter.write(out, entry.getKey());
+        valueWriter.write(out, entry.getValue());
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      Preconditions.checkState(
+        keyWriter != null,
+        "keyWriter is not provided"
+      );
+      Preconditions.checkState(
+        valueWriter != null,
+        "valueWriter is not provided"
+      );
+
+      int size = in.readInt();
+      map.clear();
+      map.trim(size);
+      while (size-- > 0) {
+        K key = keyWriter.readFields(in);
+        V value = valueWriter.readFields(in);
+        map.put(key, value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
new file mode 100644
index 0000000..23df7d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops.collections;
+
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicObject2ObjectOpenHashMap;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Utility functions for constructing basic collections
+ */
+public class BasicCollectionsUtils {
+  /** No instances */
+  private BasicCollectionsUtils() { }
+
+  /**
+   * Construct OpenHashMap with primitive keys.
+   *
+   * @param <I> Vertex id type
+   * @param <V> Value type
+   * @param idClass Class type
+   * @return map
+   */
+  public static <I extends Writable, V>
+  Basic2ObjectMap<I, V> create2ObjectMap(Class<I> idClass) {
+    return create2ObjectMap(idClass, null, null);
+  }
+
+  /**
+   * Construct OpenHashMap with primitive keys.
+   *
+   * If keyWriter/valueWriter are not provided,
+   * readFields/write will throw an Exception, if called.
+   *
+   * @param <I> Vertex id type
+   * @param <V> Value type
+   * @param idClass Class type
+   * @param keyWriter writer for keys
+   * @param valueWriter writer for values
+   * @return map
+   */
+  public static <I extends Writable, V>
+  Basic2ObjectMap<I, V> create2ObjectMap(
+    Class<I> idClass,
+    WritableWriter<I> keyWriter,
+    WritableWriter<V> valueWriter
+  ) {
+    PrimitiveIdTypeOps<I> idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(
+      idClass
+    );
+    if (idTypeOps != null) {
+      return idTypeOps.create2ObjectOpenHashMap(valueWriter);
+    } else {
+      return new BasicObject2ObjectOpenHashMap<>(keyWriter, valueWriter);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 2865a53..46f7b48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.utils;
 
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -29,12 +29,13 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicCollectionsUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * TestGraph class for in-memory testing.
@@ -50,7 +51,7 @@ public class TestGraph<I extends WritableComparable,
   /** Vertex value combiner */
   protected final VertexValueCombiner<V> vertexValueCombiner;
   /** The vertex values */
-  protected HashMap<I, Vertex<I, V, E>> vertices = Maps.newHashMap();
+  protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
   /** The configuration */
   protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
@@ -60,12 +61,19 @@ public class TestGraph<I extends WritableComparable,
    * @param conf Should have vertex and edge classes set.
    */
   public TestGraph(GiraphConfiguration conf) {
-    this.conf = new ImmutableClassesGiraphConfiguration(conf);
+    this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
     vertexValueCombiner = this.conf.createVertexValueCombiner();
+    vertices = BasicCollectionsUtils.create2ObjectMap(
+      this.conf.getVertexIdClass()
+    );
   }
 
-  public HashMap<I, Vertex<I, V, E>> getVertices() {
-    return vertices;
+  public Collection<Vertex<I, V, E>> getVertices() {
+    return vertices.values();
+  }
+
+  public int getVertexCount() {
+    return vertices.size();
   }
 
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -108,7 +116,7 @@ public class TestGraph<I extends WritableComparable,
    * @return this
    */
   public TestGraph<I, V, E> addVertex(I id, V value,
-                                         Entry<I, E>... edges) {
+                                      Entry<I, E>... edges) {
     addVertex(makeVertex(id, value, edges));
     return this;
   }
@@ -174,14 +182,6 @@ public class TestGraph<I extends WritableComparable,
       .addEdge(EdgeFactory.create(toVertex, edgeValue));
     return this;
   }
-  /**
-   * An iterator over the ids
-   *
-   * @return the iterator
-   */
-  public Iterator<I> idIterator() {
-    return vertices.keySet().iterator();
-  }
 
   /**
    * An iterator over the vertices
@@ -190,7 +190,7 @@ public class TestGraph<I extends WritableComparable,
    */
   @Override
   public Iterator<Vertex<I, V, E>> iterator() {
-    return vertices.values().iterator();
+    return vertices.valueIterator();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
index aa25490..c3c43fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -18,12 +18,12 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /** Verbose Error mesage for ByteArray based messages */
 public class VerboseByteStructMessageWrite {
   /**
@@ -43,13 +43,37 @@ public class VerboseByteStructMessageWrite {
    * @throws IOException
    * @throws RuntimeException
    */
-  public static <I extends WritableComparable, M extends Writable> void
-  verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
-    DataOutput out) throws IOException {
+  public static <I extends WritableComparable, M extends Writable>
+  void verboseWriteCurrentMessage(
+    VertexIdMessageIterator<I, M> iterator,
+    DataOutput out
+  ) throws IOException {
+    verboseWriteCurrentMessage(
+      iterator.getCurrentVertexId(), iterator.getCurrentMessage(), out);
+  }
+
+  /**
+   * verboseWriteCurrentMessage
+   * de-serialize, then write messages
+   *
+   * @param vertexId vertexId
+   * @param message message
+   * @param out DataOutput
+   * @param <I> vertexId
+   * @param <M> message
+   * @throws IOException
+   * @throws RuntimeException
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  void verboseWriteCurrentMessage(
+    I vertexId,
+    M message,
+    DataOutput out
+  ) throws IOException {
     try {
-      iterator.getCurrentMessage().write(out);
+      message.write(out);
     } catch (NegativeArraySizeException e) {
-      handleNegativeArraySize(iterator.getCurrentVertexId());
+      handleNegativeArraySize(vertexId);
     }
   }
 
@@ -59,8 +83,8 @@ public class VerboseByteStructMessageWrite {
    * @param vertexId vertexId
    * @param <I> vertexId type
    */
-  public static <I extends WritableComparable> void handleNegativeArraySize(
-      I vertexId) {
+  public static <I extends WritableComparable>
+  void handleNegativeArraySize(I vertexId) {
     throw new RuntimeException("The numbers of bytes sent to vertex " +
         vertexId + " exceeded the max capacity of " +
         "its ExtendedDataOutput. Please consider setting " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index cdb9b7e..c51521d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -183,7 +183,7 @@ public class BspServiceWorker<I extends WritableComparable,
   private GiraphTimer waitRequestsTimer;
 
   /** InputSplit handlers used in INPUT_SUPERSTEP */
-  private WorkerInputSplitsHandler inputSplitsHandler;
+  private final WorkerInputSplitsHandler inputSplitsHandler;
 
   /** Memory observer */
   private final MemoryObserver memoryObserver;
@@ -1784,6 +1784,31 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
+  public Iterable<Integer> getPartitionIds() {
+    return getPartitionStore().getPartitionIds();
+  }
+
+  @Override
+  public long getPartitionVertexCount(Integer partitionId) {
+    return getPartitionStore().getPartitionVertexCount(partitionId);
+  }
+
+  @Override
+  public void startIteration() {
+    getPartitionStore().startIteration();
+  }
+
+  @Override
+  public Partition getNextPartition() {
+    return getPartitionStore().getNextPartition();
+  }
+
+  @Override
+  public void putPartition(Partition partition) {
+    getPartitionStore().putPartition(partition);
+  }
+
+  @Override
   public ServerData<I, V, E> getServerData() {
     return workerServer.getServerData();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index e3b2db0..2c5f2db 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.FloatSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -71,8 +71,10 @@ public class TestIntFloatPrimitiveMessageStores {
     );
     PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
     Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(service.getPartitionIds()).thenReturn(
+      Lists.newArrayList(0, 1));
     Mockito.when(partitionStore.getPartitionIds()).thenReturn(
-        Lists.newArrayList(0, 1));
+      Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
     Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -144,8 +146,8 @@ public class TestIntFloatPrimitiveMessageStores {
 
   @Test
   public void testIntByteArrayMessageStore() {
-    IntByteArrayMessageStore<FloatWritable> messageStore =
-        new IntByteArrayMessageStore<FloatWritable>(new
+    IdByteArrayMessageStore<IntWritable, FloatWritable> messageStore =
+        new IdByteArrayMessageStore<>(new
             TestMessageValueFactory<FloatWritable>(FloatWritable.class),
             service, conf);
     insertIntFloatMessages(messageStore);

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index dc9850b..5508b2c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.DoubleSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -68,8 +68,10 @@ public class TestLongDoublePrimitiveMessageStores {
     );
     PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
     Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+    Mockito.when(service.getPartitionIds()).thenReturn(
+      Lists.newArrayList(0, 1));
     Mockito.when(partitionStore.getPartitionIds()).thenReturn(
-        Lists.newArrayList(0, 1));
+      Lists.newArrayList(0, 1));
     Partition partition = Mockito.mock(Partition.class);
     Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
     Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -145,8 +147,8 @@ public class TestLongDoublePrimitiveMessageStores {
 
   @Test
   public void testLongByteArrayMessageStore() {
-    LongByteArrayMessageStore<DoubleWritable> messageStore =
-        new LongByteArrayMessageStore<DoubleWritable>(
+    IdByteArrayMessageStore<LongWritable, DoubleWritable> messageStore =
+        new IdByteArrayMessageStore<>(
             new TestMessageValueFactory<DoubleWritable>(DoubleWritable.class),
             service, createLongDoubleConf());
     insertLongDoubleMessages(messageStore);

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
index ffc1288..1294c88 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.giraph.comm.messages.queue;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.factories.TestMessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -26,14 +34,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
 /**
  * Test case for AsyncMessageStoreWrapper
  */
@@ -71,6 +71,10 @@ public class AsyncMessageStoreWrapperTest {
     }
 
     @Override
+    public void addMessage(LongWritable vertexId, IntWritable message) throws IOException {
+    }
+
+    @Override
     public boolean isPointerListEncoding() {
       return false;
     }
@@ -124,5 +128,6 @@ public class AsyncMessageStoreWrapperTest {
     public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
 
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index d56c0fb..954e420 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -56,7 +56,7 @@ public class TestSwitchClasses {
     graph.addVertex(id2, new StatusValue());
     graph = InternalVertexRunner.runWithInMemoryOutput(conf, graph);
 
-    Assert.assertEquals(2, graph.getVertices().size());
+    Assert.assertEquals(2, graph.getVertexCount());
   }
 
   private static void checkVerticesOnFinalSuperstep(