You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/05/31 18:04:32 UTC

[2/2] git commit: updated refs/heads/trunk to c94dd9c

Cleanup the old out-of-core message mechanism

Summary: With the new out-of-core infrastructure, there is no need for the old version of message out-of-core. The old version of message out-of-core also interferes with the new mechanism. It seems that the old out-of-core message mechanism is not necessary anymore. This diff removes the old out-of-core messages and cleans up its implications on the rest of the code base.

Test Plan:
mvn clean verify
snapshot tests passes

Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov

Differential Revision: https://reviews.facebook.net/D58701


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c94dd9c7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c94dd9c7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c94dd9c7

Branch: refs/heads/trunk
Commit: c94dd9c74895d419f72974fcbe5456fbae84b7a9
Parents: 6256a76
Author: Hassan Eslami <he...@fb.com>
Authored: Tue May 31 10:37:00 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue May 31 10:37:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/comm/ServerData.java |  20 +-
 .../messages/AbstractListPerVertexStore.java    |   3 +-
 .../ByteArrayMessagesPerVertexStore.java        |  32 +-
 .../messages/InMemoryMessageStoreFactory.java   |   5 -
 .../giraph/comm/messages/MessageStore.java      |  17 +-
 .../comm/messages/MessageStoreFactory.java      |  10 -
 .../comm/messages/OneMessagePerVertexStore.java |   7 +-
 .../messages/PointerListPerVertexStore.java     |  43 +-
 .../comm/messages/SimpleMessageStore.java       |   8 +-
 .../out_of_core/DiskBackedMessageStore.java     | 305 -------------
 .../DiskBackedMessageStoreFactory.java          |  97 ----
 .../PartitionDiskBackedMessageStore.java        | 370 ----------------
 .../out_of_core/SequentialFileMessageStore.java | 437 -------------------
 .../comm/messages/out_of_core/package-info.java |  21 -
 .../primitives/IdByteArrayMessageStore.java     |  33 +-
 .../primitives/IdOneMessagePerVertexStore.java  |  11 +-
 .../primitives/IntByteArrayMessageStore.java    |  32 +-
 .../primitives/IntFloatMessageStore.java        |  11 +-
 .../primitives/LongDoubleMessageStore.java      |  11 +-
 .../long_id/LongAbstractMessageStore.java       |   7 +-
 .../long_id/LongByteArrayMessageStore.java      |  25 +-
 .../long_id/LongPointerListMessageStore.java    |  45 +-
 .../queue/AsyncMessageStoreWrapper.java         |  12 +-
 .../NettyWorkerClientRequestProcessor.java      |  23 +-
 .../SendPartitionCurrentMessagesRequest.java    |   8 +-
 .../requests/SendWorkerMessagesRequest.java     |  12 +-
 .../SendWorkerOneMessageToManyRequest.java      | 102 +++--
 .../org/apache/giraph/conf/GiraphConstants.java |  22 -
 .../giraph/ooc/data/DiskBackedMessageStore.java |  17 +-
 .../giraph/partition/SimplePartition.java       |  33 +-
 .../apache/giraph/comm/RequestFailureTest.java  |   2 +-
 .../org/apache/giraph/comm/RequestTest.java     |   8 +-
 .../apache/giraph/comm/TestMessageStores.java   |  38 +-
 .../TestIntFloatPrimitiveMessageStores.java     |  12 +-
 .../TestLongDoublePrimitiveMessageStores.java   |   9 +-
 .../queue/AsyncMessageStoreWrapperTest.java     |  12 +-
 .../apache/giraph/graph/TestVertexAndEdges.java |   2 -
 .../giraph/jython/TestJythonComputation.java    |   6 -
 38 files changed, 248 insertions(+), 1620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 69fbfee..4156d8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -234,10 +233,8 @@ public class ServerData<I extends WritableComparable,
   /**
    * Re-initialize message stores.
    * Discards old values if any.
-   *
-   * @throws IOException
    */
-  public void resetMessageStores() throws IOException {
+  public void resetMessageStores() {
     if (currentMessageStore != null) {
       currentMessageStore.clearAll();
       currentMessageStore = null;
@@ -252,12 +249,7 @@ public class ServerData<I extends WritableComparable,
   /** Prepare for next superstep */
   public void prepareSuperstep() {
     if (currentMessageStore != null) {
-      try {
-        currentMessageStore.clearAll();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to clear previous message store");
-      }
+      currentMessageStore.clearAll();
     }
 
     MessageStore<I, Writable> nextCurrentMessageStore;
@@ -422,13 +414,7 @@ public class ServerData<I extends WritableComparable,
           partition.putVertex(vertex);
         } else if (originalVertex != null) {
           partition.removeVertex(vertexId);
-          try {
-            getCurrentMessageStore().clearVertexMessages(vertexId);
-          } catch (IOException e) {
-            throw new IllegalStateException("resolvePartitionMutations: " +
-                "Caught IOException while clearing messages for a deleted " +
-                "vertex due to a mutation");
-          }
+          getCurrentMessageStore().clearVertexMessages(vertexId);
         }
         context.progress();
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
index 6840f86..c28dff5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
@@ -26,7 +26,6 @@ import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -89,7 +88,7 @@ public abstract class AbstractListPerVertexStore<I extends WritableComparable,
   }
 
   @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) {
     ConcurrentMap<I, L> partitionMap =
         map.get(getPartitionId(vertexId));
     if (partitionMap == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 29a0888..efbe11b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -95,7 +95,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
   @Override
   public void addPartitionMessages(
-    int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    int partitionId, VertexIdMessages<I, M> messages) {
     ConcurrentMap<I, DataInputOutput> partitionMap =
         getOrCreatePartitionMap(partitionId);
     VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
@@ -117,17 +117,22 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
         }
       }
     } else {
-      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
-          messages.getVertexIdMessageIterator();
-      while (vertexIdMessageIterator.hasNext()) {
-        vertexIdMessageIterator.next();
-        DataInputOutput dataInputOutput =
-            getDataInputOutput(partitionMap, vertexIdMessageIterator);
-
-        synchronized (dataInputOutput) {
-          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
-              vertexIdMessageIterator, dataInputOutput.getDataOutput());
+      try {
+        VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+            messages.getVertexIdMessageIterator();
+        while (vertexIdMessageIterator.hasNext()) {
+          vertexIdMessageIterator.next();
+          DataInputOutput dataInputOutput =
+              getDataInputOutput(partitionMap, vertexIdMessageIterator);
+
+          synchronized (dataInputOutput) {
+            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
+                vertexIdMessageIterator, dataInputOutput.getDataOutput());
+          }
         }
+      } catch (IOException e) {
+        throw new RuntimeException("addPartitionMessages: IOException while" +
+            " adding messages for a partition: " + e);
       }
     }
   }
@@ -225,10 +230,5 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
       this.service = service;
       this.config = conf;
     }
-
-    @Override
-    public boolean shouldTraverseMessagesInOrder() {
-      return false;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 27980a9..99a12c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -206,9 +206,4 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
     this.service = service;
     this.conf = conf;
   }
-
-  @Override
-  public boolean shouldTraverseMessagesInOrder() {
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 6e85ea3..9c56d85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -48,24 +48,20 @@ public interface MessageStore<I extends WritableComparable,
    *
    * @param vertexId Vertex id for which we want to get messages
    * @return Iterable of messages for a vertex id
-   * @throws java.io.IOException
    */
-  Iterable<M> getVertexMessages(I vertexId) throws IOException;
+  Iterable<M> getVertexMessages(I vertexId);
 
   /**
    * Clears messages for a vertex.
    *
    * @param vertexId Vertex id for which we want to clear messages
-   * @throws IOException
    */
-  void clearVertexMessages(I vertexId) throws IOException;
+  void clearVertexMessages(I vertexId);
 
   /**
    * Clears all resources used by this store.
-   *
-   * @throws IOException
    */
-  void clearAll() throws IOException;
+  void clearAll();
 
   /**
    * Check if we have messages for some vertex
@@ -88,11 +84,9 @@ public interface MessageStore<I extends WritableComparable,
    *
    * @param partitionId Id of partition
    * @param messages    Collection of vertex ids and messages we want to add
-   * @throws IOException
    */
   void addPartitionMessages(
-      int partitionId, VertexIdMessages<I, M> messages)
-    throws IOException;
+      int partitionId, VertexIdMessages<I, M> messages);
 
   /**
    * Called before start of computation in bspworker
@@ -113,9 +107,8 @@ public interface MessageStore<I extends WritableComparable,
    * Clears messages for a partition.
    *
    * @param partitionId Partition id for which we want to clear messages
-   * @throws IOException
    */
-  void clearPartition(int partitionId) throws IOException;
+  void clearPartition(int partitionId);
 
   /**
    * Serialize messages for one partition.

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 41076e3..6a18aa8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -50,14 +50,4 @@ public interface MessageStoreFactory<I extends WritableComparable,
    */
   void initialize(CentralizedServiceWorker<I, ?, ?> service,
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf);
-
-  /**
-   * This method is more for the performance optimization. If the message
-   * traversal would be done in order then data structure which is optimized
-   * for such traversal can be used.
-   *
-   * @return true if the messages would be traversed in order
-   * else return false
-   */
-  boolean shouldTraverseMessagesInOrder();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index ad0a5dc..1d67014 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -71,7 +71,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   @Override
   public void addPartitionMessages(
       int partitionId,
-      VertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) {
     ConcurrentMap<I, M> partitionMap =
         getOrCreatePartitionMap(partitionId);
     VertexIdMessageIterator<I, M> vertexIdMessageIterator =
@@ -175,10 +175,5 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       this.service = service;
       this.config = conf;
     }
-
-    @Override
-    public boolean shouldTraverseMessagesInOrder() {
-      return false;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
index cce0439..4b32a17 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
@@ -77,26 +77,31 @@ public class PointerListPerVertexStore<I extends WritableComparable,
 
   @Override
   public void addPartitionMessages(
-    int partitionId, VertexIdMessages<I, M> messages) throws IOException {
-    VertexIdMessageIterator<I, M> vertexIdMessageIterator =
-        messages.getVertexIdMessageIterator();
-    long pointer = 0;
-    LongArrayList list;
-    while (vertexIdMessageIterator.hasNext()) {
-      vertexIdMessageIterator.next();
-      M msg = vertexIdMessageIterator.getCurrentMessage();
-      list = getOrCreateList(vertexIdMessageIterator);
-      if (vertexIdMessageIterator.isNewMessage()) {
-        IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
-        pointer = indexAndDataOut.getIndex();
-        pointer <<= 32;
-        ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
-        pointer += dataOutput.getPos();
-        msg.write(dataOutput);
-      }
-      synchronized (list) {
-        list.add(pointer);
+    int partitionId, VertexIdMessages<I, M> messages) {
+    try {
+      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+          messages.getVertexIdMessageIterator();
+      long pointer = 0;
+      LongArrayList list;
+      while (vertexIdMessageIterator.hasNext()) {
+        vertexIdMessageIterator.next();
+        M msg = vertexIdMessageIterator.getCurrentMessage();
+        list = getOrCreateList(vertexIdMessageIterator);
+        if (vertexIdMessageIterator.isNewMessage()) {
+          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+          pointer = indexAndDataOut.getIndex();
+          pointer <<= 32;
+          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+          pointer += dataOutput.getPos();
+          msg.write(dataOutput);
+        }
+        synchronized (list) {
+          list.add(pointer);
+        }
       }
+    } catch (IOException e) {
+      throw new RuntimeException("addPartitionMessages: IOException while" +
+          " adding messages for a partition: " + e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 054302d..9c3ef7f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -157,7 +157,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) {
     ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
     if (partitionMap == null) {
       return Collections.<M>emptyList();
@@ -197,7 +197,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
+  public void clearVertexMessages(I vertexId) {
     ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));
     if (partitionMap != null) {
@@ -206,7 +206,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.remove(partitionId);
   }
 
@@ -217,7 +217,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
deleted file mode 100644
index 0d7009b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MessageStore<I, M> {
-  /** Message value factory */
-  private final MessageClasses<I, M> messageClasses;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, V, E> service;
-  /** Number of messages to keep in memory */
-  private final int maxNumberOfMessagesInMemory;
-  /** Factory for creating file stores when flushing */
-  private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
-  partitionStoreFactory;
-  /** Map from partition id to its message store */
-  private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>>
-  partitionMessageStores;
-
-  /**
-   * Constructor
-   *
-   * @param messageClasses              Message classes information
-   * @param service                     Service worker
-   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
-   * @param partitionStoreFactory       Factory for creating stores for a
-   *                                    partition
-   */
-  public DiskBackedMessageStore(
-      MessageClasses<I, M> messageClasses,
-      CentralizedServiceWorker<I, V, E> service,
-      int maxNumberOfMessagesInMemory,
-      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
-          M>> partitionStoreFactory) {
-    this.messageClasses = messageClasses;
-    this.service = service;
-    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
-    this.partitionStoreFactory = partitionStoreFactory;
-    partitionMessageStores = Maps.newConcurrentMap();
-  }
-
-  @Override
-  public boolean isPointerListEncoding() {
-    return false;
-  }
-
-  @Override
-  public void addPartitionMessages(
-      int partitionId,
-      VertexIdMessages<I, M> messages) throws IOException {
-    PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
-        getMessageStore(partitionId);
-    VertexIdMessageIterator<I, M>
-        vertexIdMessageIterator =
-        messages.getVertexIdMessageIterator();
-    while (vertexIdMessageIterator.hasNext()) {
-      vertexIdMessageIterator.next();
-      boolean ownsVertexId =
-          partitionMessageStore.addVertexMessages(
-              vertexIdMessageIterator.getCurrentVertexId(),
-              Collections.singleton(
-                  vertexIdMessageIterator.getCurrentMessage()));
-      if (ownsVertexId) {
-        vertexIdMessageIterator.releaseCurrentVertexId();
-      }
-    }
-    checkMemory();
-  }
-
-  @Override
-  public void finalizeStore() {
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      return getMessageStore(vertexId).getVertexMessages(vertexId);
-    } else {
-      return EmptyIterable.get();
-    }
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
-  }
-
-  @Override
-  public boolean hasMessagesForPartition(int partitionId) {
-    PartitionDiskBackedMessageStore<I, M> partitionMessages =
-        getMessageStore(partitionId);
-    return partitionMessages != null && !Iterables
-        .isEmpty(partitionMessages.getDestinationVertices());
-  }
-
-  @Override
-  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
-    PartitionDiskBackedMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore == null) {
-      return Collections.emptyList();
-    } else {
-      return messageStore.getDestinationVertices();
-    }
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      getMessageStore(vertexId).clearVertexMessages(vertexId);
-    }
-  }
-
-  @Override
-  public void clearPartition(int partitionId) throws IOException {
-    PartitionDiskBackedMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      messageStore.clearAll();
-    }
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    for (PartitionDiskBackedMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      messageStore.clearAll();
-    }
-    partitionMessageStores.clear();
-  }
-
-  /**
-   * Checks the memory status, flushes if necessary
-   *
-   * @throws IOException
-   */
-  private void checkMemory() throws IOException {
-    while (memoryFull()) {
-      flushOnePartition();
-    }
-  }
-
-  /**
-   * Check if memory is full
-   *
-   * @return True iff memory is full
-   */
-  private boolean memoryFull() {
-    int totalMessages = 0;
-    for (PartitionDiskBackedMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      totalMessages += messageStore.getNumberOfMessages();
-    }
-    return totalMessages > maxNumberOfMessagesInMemory;
-  }
-
-  /**
-   * Finds biggest partition and flushes it to the disk
-   *
-   * @throws IOException
-   */
-  private void flushOnePartition() throws IOException {
-    int maxMessages = 0;
-    PartitionDiskBackedMessageStore<I, M> biggestStore = null;
-    for (PartitionDiskBackedMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      int numMessages = messageStore.getNumberOfMessages();
-      if (numMessages > maxMessages) {
-        maxMessages = numMessages;
-        biggestStore = messageStore;
-      }
-    }
-    if (biggestStore != null) {
-      biggestStore.flush();
-    }
-  }
-
-  /**
-   * Get message store for partition which holds vertex with required vertex
-   * id
-   *
-   * @param vertexId Id of vertex for which we are asking for message store
-   * @return Requested message store
-   */
-  private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) {
-    int partitionId =
-        service.getVertexPartitionOwner(vertexId).getPartitionId();
-    return getMessageStore(partitionId);
-  }
-
-  /**
-   * Get message store for partition id. It it doesn't exist yet,
-   * creates a new one.
-   *
-   * @param partitionId Id of partition for which we are asking for message
-   *                    store
-   * @return Requested message store
-   */
-  private PartitionDiskBackedMessageStore<I, M> getMessageStore(
-      int partitionId) {
-    PartitionDiskBackedMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      return messageStore;
-    }
-    messageStore = partitionStoreFactory.newStore(messageClasses);
-    PartitionDiskBackedMessageStore<I, M> store =
-        partitionMessageStores.putIfAbsent(partitionId, messageStore);
-    return (store == null) ? messageStore : store;
-  }
-
-  @Override
-  public void writePartition(DataOutput out,
-      int partitionId) throws IOException {
-    PartitionDiskBackedMessageStore<I, M> partitionStore =
-        partitionMessageStores.get(partitionId);
-    out.writeBoolean(partitionStore != null);
-    if (partitionStore != null) {
-      partitionStore.write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException {
-    if (in.readBoolean()) {
-      PartitionDiskBackedMessageStore<I, M> messageStore =
-          partitionStoreFactory.newStore(messageClasses);
-      messageStore.readFields(in);
-      partitionMessageStores.put(partitionId, messageStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param service             Service worker
-   * @param maxMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory    Factory for creating file stores when
-   *                            flushing
-   * @param <I>                 Vertex id
-   * @param <V>                 Vertex data
-   * @param <E>                 Edge data
-   * @param <M>                 Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
-    CentralizedServiceWorker<I, V, E> service,
-      int maxMessagesInMemory,
-      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
-          fileStoreFactory) {
-    return new DiskBackedMessageStoreFactory<I, V, E, M>(service,
-        maxMessagesInMemory,
-        fileStoreFactory);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
deleted file mode 100644
index 728a2ed..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store factory which persist the messages on the disk.
- *
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
- */
-public class DiskBackedMessageStoreFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements MessageStoreFactory<I, M, MessageStore<I, M>> {
-  /** Service worker */
-  private CentralizedServiceWorker<I, V, E> service;
-  /** Number of messages to keep in memory */
-  private int maxMessagesInMemory;
-  /** Factory for creating file stores when flushing */
-  private MessageStoreFactory<I, M,
-    PartitionDiskBackedMessageStore<I, M>> fileStoreFactory;
-
-  /**
-   * Default constructor class helps in class invocation via Reflection
-   */
-  public DiskBackedMessageStoreFactory() {
-  }
-
-  /**
-   * @param service Service worker
-   * @param maxMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory Factory for creating file stores when flushing
-   */
-  public DiskBackedMessageStoreFactory(
-      CentralizedServiceWorker<I, V, E> service,
-      int maxMessagesInMemory,
-      MessageStoreFactory<I, M,
-        PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) {
-    this.service = service;
-    this.maxMessagesInMemory = maxMessagesInMemory;
-    this.fileStoreFactory = fileStoreFactory;
-  }
-
-  @Override
-  public MessageStore<I, M>
-  newStore(MessageClasses<I, M> messageClasses) {
-    return new DiskBackedMessageStore<I, V, E, M>(messageClasses,
-        service, maxMessagesInMemory, fileStoreFactory);
-  }
-
-  @Override
-  public void initialize(CentralizedServiceWorker service,
-      ImmutableClassesGiraphConfiguration conf) {
-    this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
-
-    MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
-      fileMessageStoreFactory =
-        SequentialFileMessageStore.newFactory(conf);
-    this.fileStoreFactory =
-        PartitionDiskBackedMessageStore.newFactory(conf,
-            fileMessageStoreFactory);
-
-    this.service = service;
-  }
-
-  @Override
-  public boolean shouldTraverseMessagesInOrder() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
deleted file mode 100644
index 698281f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk. Holds messages for a single partition.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class PartitionDiskBackedMessageStore<I extends WritableComparable,
-    M extends Writable> implements Writable {
-  /** Message classes */
-  private final MessageClasses<I, M> messageClasses;
-  /** Message value factory */
-  private final MessageValueFactory<M> messageValueFactory;
-  /**
-   * In-memory message map (must be sorted to insure that the ids are
-   * ordered)
-   */
-  private volatile ConcurrentNavigableMap<I, DataInputOutput>
-  inMemoryMessages;
-  /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Counter for number of messages in-memory */
-  private final AtomicInteger numberOfMessagesInMemory;
-  /** To keep vertex ids which we have messages for */
-  private final Set<I> destinationVertices;
-  /** File stores in which we keep flushed messages */
-  private final Collection<SequentialFileMessageStore<I, M>> fileStores;
-  /** Factory for creating file stores when flushing */
-  private final
-  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
-  /** Lock for disk flushing */
-  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
-  /**
-   * Constructor.
-   *
-   * @param messageClasses      Message classes information
-   * @param config              Hadoop configuration
-   * @param fileStoreFactory    Factory for creating file stores when flushing
-   */
-  public PartitionDiskBackedMessageStore(
-      MessageClasses<I, M> messageClasses,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
-          fileStoreFactory) {
-    inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
-    this.messageClasses = messageClasses;
-    this.messageValueFactory = messageClasses.createMessageValueFactory(config);
-    this.config = config;
-    numberOfMessagesInMemory = new AtomicInteger(0);
-    destinationVertices =
-        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
-    fileStores = Lists.newArrayList();
-    this.fileStoreFactory = fileStoreFactory;
-  }
-
-  /**
-   * Add vertex messages
-   *
-   * @param vertexId Vertex id to use
-   * @param messages Messages to add (note that the lifetime of the messages)
-   *                 is only until next() is called again)
-   * @return True if the vertex id ownership is taken by this method,
-   *         false otherwise
-   * @throws IOException
-   */
-  boolean addVertexMessages(I vertexId,
-                            Iterable<M> messages) throws IOException {
-    boolean ownsVertexId = false;
-    destinationVertices.add(vertexId);
-    rwLock.readLock().lock();
-    try {
-      DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
-      if (dataInputOutput == null) {
-        DataInputOutput newDataInputOutput =
-            config.createMessagesInputOutput();
-        dataInputOutput =
-            inMemoryMessages.putIfAbsent(vertexId, newDataInputOutput);
-        if (dataInputOutput == null) {
-          ownsVertexId = true;
-          dataInputOutput = newDataInputOutput;
-        }
-      }
-
-      synchronized (dataInputOutput) {
-        for (M message : messages) {
-          message.write(dataInputOutput.getDataOutput());
-          numberOfMessagesInMemory.getAndIncrement();
-        }
-      }
-    } finally {
-      rwLock.readLock().unlock();
-    }
-
-    return ownsVertexId;
-  }
-
-  /**
-   * Get the messages for a vertex.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Iterable of messages for a vertex id
-   */
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
-    if (dataInputOutput == null) {
-      dataInputOutput = config.createMessagesInputOutput();
-    }
-    Iterable<M> combinedIterable = new MessagesIterable<M>(
-        dataInputOutput, messageValueFactory);
-
-    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
-      combinedIterable = Iterables.concat(combinedIterable,
-          fileStore.getVertexMessages(vertexId));
-    }
-    return combinedIterable;
-  }
-
-  /**
-   * Get number of messages in memory
-   *
-   * @return Number of messages in memory
-   */
-  public int getNumberOfMessages() {
-    return numberOfMessagesInMemory.get();
-  }
-
-  /**
-   * Check if we have messages for some vertex
-   *
-   * @param vertexId Id of vertex which we want to check
-   * @return True iff we have messages for vertex with required id
-   */
-  public boolean hasMessagesForVertex(I vertexId) {
-    return destinationVertices.contains(vertexId);
-  }
-
-  /**
-   * Gets vertex ids which we have messages for
-   *
-   * @return Iterable over vertex ids which we have messages for
-   */
-  public Iterable<I> getDestinationVertices() {
-    return destinationVertices;
-  }
-
-  /**
-   * Clears messages for a vertex.
-   *
-   * @param vertexId Vertex id for which we want to clear messages
-   * @throws IOException
-   */
-  public void clearVertexMessages(I vertexId) throws IOException {
-    inMemoryMessages.remove(vertexId);
-  }
-
-  /**
-   * Clears all resources used by this store.
-   *
-   * @throws IOException
-   */
-  public void clearAll() throws IOException {
-    inMemoryMessages.clear();
-    destinationVertices.clear();
-    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
-      fileStore.clearAll();
-    }
-    fileStores.clear();
-  }
-
-  /**
-   * Flushes messages to the disk.
-   *
-   * @throws IOException
-   */
-  public void flush() throws IOException {
-    ConcurrentNavigableMap<I, DataInputOutput> messagesToFlush = null;
-    rwLock.writeLock().lock();
-    try {
-      messagesToFlush = inMemoryMessages;
-      inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
-      numberOfMessagesInMemory.set(0);
-    } finally {
-      rwLock.writeLock().unlock();
-    }
-    SequentialFileMessageStore<I, M> fileStore =
-        fileStoreFactory.newStore(messageClasses);
-    fileStore.addMessages(messagesToFlush);
-
-    synchronized (fileStores) {
-      fileStores.add(fileStore);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // write destination vertices
-    out.writeInt(destinationVertices.size());
-    for (I vertexId : destinationVertices) {
-      vertexId.write(out);
-    }
-
-    // write of in-memory messages
-    out.writeInt(numberOfMessagesInMemory.get());
-
-    // write in-memory messages map
-    out.writeInt(inMemoryMessages.size());
-    for (Entry<I, DataInputOutput> entry : inMemoryMessages.entrySet()) {
-      entry.getKey().write(out);
-      entry.getValue().write(out);
-    }
-
-    // write file stores
-    out.writeInt(fileStores.size());
-    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
-      fileStore.write(out);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    // read destination vertices
-    int numVertices = in.readInt();
-    for (int v = 0; v < numVertices; v++) {
-      I vertexId = config.createVertexId();
-      vertexId.readFields(in);
-      destinationVertices.add(vertexId);
-    }
-
-    // read in-memory messages
-    numberOfMessagesInMemory.set(in.readInt());
-
-    // read in-memory map
-    int mapSize = in.readInt();
-    for (int m = 0; m < mapSize; m++) {
-      I vertexId = config.createVertexId();
-      vertexId.readFields(in);
-      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
-      dataInputOutput.readFields(in);
-      inMemoryMessages.put(vertexId, dataInputOutput);
-    }
-
-    // read file stores
-    int numFileStores = in.readInt();
-    for (int s = 0; s < numFileStores; s++) {
-      SequentialFileMessageStore<I, M> fileStore =
-          fileStoreFactory.newStore(messageClasses);
-      fileStore.readFields(in);
-      fileStores.add(fileStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config           Hadoop configuration
-   * @param fileStoreFactory Factory for creating message stores for
-   *                         partitions
-   * @param <I>              Vertex id
-   * @param <M>              Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
-          fileStoreFactory) {
-    return new Factory<I, M>(config, fileStoreFactory);
-  }
-
-  /**
-   * Factory for {@link PartitionDiskBackedMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable> implements MessageStoreFactory<I, M,
-      PartitionDiskBackedMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-    /** Factory for creating message stores for partitions */
-    private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
-    fileStoreFactory;
-
-    /**
-     * @param config           Hadoop configuration
-     * @param fileStoreFactory Factory for creating message stores for
-     *                         partitions
-     */
-    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-        MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
-            fileStoreFactory) {
-      this.config = config;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public PartitionDiskBackedMessageStore<I, M> newStore(
-        MessageClasses<I, M> messageClasses) {
-      return new PartitionDiskBackedMessageStore<I, M>(messageClasses,
-          config, fileStoreFactory);
-    }
-
-    @Override
-    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-      /* Implementation of this method is required if the class is to
-       * be exposed publicly and allow instantiating the class via the
-       * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
-       * a private class, hence the implementation of this method is skipped
-       * as the caller knows the specific required constructor parameters
-       * for instantiation.
-      */
-    }
-
-    @Override
-    public boolean shouldTraverseMessagesInOrder() {
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
deleted file mode 100644
index 8f589bc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(NavigableMap)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
-    M extends Writable> implements Writable {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SequentialFileMessageStore.class);
-  /** Message class */
-  private final MessageValueFactory<M> messageValueFactory;
-  /** File in which we store data */
-  private final File file;
-  /** Configuration which we need for reading data */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Buffer size to use when reading and writing files */
-  private final int bufferSize;
-  /** File input stream */
-  private DataInputStream in;
-  /** How many vertices do we have left to read in the file */
-  private int verticesLeft;
-  /** Id of currently read vertex */
-  private I currentVertexId;
-
-  /**
-   * Stores message on the disk.
-   *
-   *
-   * @param messageValueFactory Used to create message values
-   * @param config       Configuration used later for reading
-   * @param bufferSize   Buffer size to use when reading and writing
-   * @param fileName     File in which we want to store messages
-   * @throws IOException
-   */
-  public SequentialFileMessageStore(
-      MessageValueFactory<M> messageValueFactory,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      int bufferSize,
-      String fileName) {
-    this.messageValueFactory = messageValueFactory;
-    this.config = config;
-    this.bufferSize = bufferSize;
-    file = new File(fileName);
-  }
-
-  /**
-   * Adds messages from one message store to another
-   *
-   * @param messageMap Add the messages from this map to this store
-   * @throws java.io.IOException
-   */
-  public void addMessages(NavigableMap<I, DataInputOutput> messageMap)
-    throws IOException {
-    // Writes messages to its file
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("addMessages: Deleting " + file);
-      }
-      if (!file.delete()) {
-        throw new IOException("Failed to delete existing file " + file);
-      }
-    }
-    if (!file.createNewFile()) {
-      throw new IOException("Failed to create file " + file);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addMessages: Creating " + file);
-    }
-
-    DataOutputStream out = null;
-
-    try {
-      out = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
-      int destinationVertexIdCount = messageMap.size();
-      out.writeInt(destinationVertexIdCount);
-
-      // Dump the vertices and their messages in a sorted order
-      for (Map.Entry<I, DataInputOutput> entry : messageMap.entrySet()) {
-        I destinationVertexId = entry.getKey();
-        destinationVertexId.write(out);
-        DataInputOutput dataInputOutput = entry.getValue();
-        Iterable<M> messages = new MessagesIterable<M>(
-            dataInputOutput, messageValueFactory);
-        int messageCount = Iterables.size(messages);
-        out.writeInt(messageCount);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("addMessages: For vertex id " + destinationVertexId +
-              ", messages = " + messageCount + " to file " + file);
-        }
-        for (M message : messages) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("addMessages: Wrote " + message + " to " + file);
-          }
-          message.write(out);
-        }
-      }
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-  }
-
-  /**
-   * Reads messages for a vertex. It will find the messages only if all
-   * previous reads used smaller vertex ids than this one - messages should
-   * be retrieved in increasing order of vertex ids.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Messages for the selected vertex, or empty list if not used
-   *         correctly
-   * @throws IOException
-   */
-  public Iterable<M> getVertexMessages(I vertexId) throws
-      IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
-          " (currently " + currentVertexId + ") from " + file);
-    }
-    if (in == null) {
-      startReading();
-    }
-
-    I nextVertexId = getCurrentVertexId();
-    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
-      nextVertexId = getNextVertexId();
-    }
-
-    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
-      return EmptyIterable.get();
-    }
-
-    return readMessagesForCurrentVertex();
-  }
-
-  /**
-   * Clears all resources used by this store.
-   */
-  public void clearAll() throws IOException {
-    endReading();
-    if (!file.delete()) {
-      LOG.error("clearAll: Failed to delete file " + file);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(file.length());
-    FileInputStream input = new FileInputStream(file);
-    try {
-      byte[] buffer = new byte[bufferSize];
-      while (true) {
-        int length = input.read(buffer);
-        if (length < 0) {
-          break;
-        }
-        out.write(buffer, 0, length);
-      }
-    } finally {
-      input.close();
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    FileOutputStream output = new FileOutputStream(file);
-    try {
-      long fileLength = in.readLong();
-      byte[] buffer = new byte[bufferSize];
-      for (long position = 0; position < fileLength; position += bufferSize) {
-        int bytes = (int) Math.min(bufferSize, fileLength - position);
-        in.readFully(buffer, 0, bytes);
-        output.write(buffer);
-      }
-    } finally {
-      output.close();
-    }
-  }
-
-  /**
-   * Prepare for reading
-   *
-   * @throws IOException
-   */
-  private void startReading() throws IOException {
-    currentVertexId = null;
-    in = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file), bufferSize));
-    verticesLeft = in.readInt();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("startReading: File " + file + " with " +
-          verticesLeft + " vertices left");
-    }
-  }
-
-  /**
-   * Gets current vertex id.
-   * <p/>
-   * If there is a vertex id whose messages haven't been read yet it
-   * will return that vertex id, otherwise it will read and return the next
-   * one.
-   *
-   * @return Current vertex id
-   * @throws IOException
-   */
-  private I getCurrentVertexId() throws IOException {
-    if (currentVertexId != null) {
-      return currentVertexId;
-    } else {
-      return getNextVertexId();
-    }
-  }
-
-  /**
-   * Gets next vertex id.
-   * <p/>
-   * If there is a vertex whose messages haven't been read yet it
-   * will read and skip over its messages to get to the next vertex.
-   *
-   * @return Next vertex id
-   * @throws IOException
-   */
-  private I getNextVertexId() throws IOException {
-    if (currentVertexId != null) {
-      readMessagesForCurrentVertex();
-    }
-    if (verticesLeft == 0) {
-      return null;
-    }
-    currentVertexId = config.createVertexId();
-    currentVertexId.readFields(in);
-    return currentVertexId;
-  }
-
-  /**
-   * Reads messages for current vertex.
-   *
-   * @return Messages for current vertex
-   * @throws IOException
-   */
-  private Collection<M> readMessagesForCurrentVertex() throws IOException {
-    int messagesSize = in.readInt();
-    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
-    for (int i = 0; i < messagesSize; i++) {
-      M message = messageValueFactory.newInstance();
-      try {
-        message.readFields(in);
-      } catch (IOException e) {
-        throw new IllegalStateException("readMessagesForCurrentVertex: " +
-            "Failed to read message from " + i + " of " +
-            messagesSize + " for vertex id " + currentVertexId + " from " +
-            file, e);
-      }
-      messages.add(message);
-    }
-    currentVertexDone();
-    return messages;
-  }
-
-  /**
-   * Release current vertex.
-   *
-   * @throws IOException
-   */
-  private void currentVertexDone() throws IOException {
-    currentVertexId = null;
-    verticesLeft--;
-    if (verticesLeft == 0) {
-      endReading();
-    }
-  }
-
-  /**
-   * Call when we are done reading, for closing files.
-   *
-   * @throws IOException
-   */
-  private void endReading() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("endReading: Stopped reading " + file);
-    }
-    if (in != null) {
-      in.close();
-      in = null;
-    }
-  }
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config Hadoop configuration
-   * @param <I>    Vertex id
-   * @param <M>    Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-    return new Factory<I, M>(config);
-  }
-
-  /**
-   * Factory for {@link SequentialFileMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable>
-      implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-    /** Directories in which we'll keep necessary files */
-    private final String[] directories;
-    /** Buffer size to use when reading and writing */
-    private final int bufferSize;
-    /** Counter for created message stores */
-    private final AtomicInteger storeCounter;
-
-    /**
-     * Constructor.
-     *
-     * @param config Hadoop configuration
-     */
-    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
-      this.config = config;
-      String jobId = config.get("mapred.job.id", "Unknown Job");
-      int taskId   = config.getTaskPartition();
-      List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
-      Collections.shuffle(userPaths);
-      directories = new String[userPaths.size()];
-      int i = 0;
-      for (String path : userPaths) {
-        String directory = path + File.separator + jobId + File.separator +
-            taskId + File.separator;
-        directories[i++] = directory;
-        if (!new File(directory).mkdirs()) {
-          LOG.error("SequentialFileMessageStore$Factory: Failed to create " +
-              directory);
-        }
-      }
-      this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
-      storeCounter = new AtomicInteger();
-    }
-
-    @Override
-    public SequentialFileMessageStore<I, M> newStore(
-        MessageClasses<I, M> messageClasses) {
-      int idx = Math.abs(storeCounter.getAndIncrement());
-      String fileName =
-          directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(
-          messageClasses.createMessageValueFactory(config), config,
-          bufferSize, fileName);
-    }
-
-    @Override
-    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-      /* Implementation of this method is required if the class is to
-       * be exposed publicly and allow instantiating the class via the
-       * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
-       * a private class, hence the implementation of this method is skipped
-       * as the caller knows the specific required constructor parameters
-       * for instantiation.
-       */
-    }
-
-    @Override
-    public boolean shouldTraverseMessagesInOrder() {
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
deleted file mode 100644
index 7039378..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of out-of-core messages related classes.
- */
-package org.apache.giraph.comm.messages.out_of_core;

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index 2e39857..57f3ff6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -142,7 +142,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
 
   @Override
   public void addPartitionMessages(int partitionId,
-      VertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) {
     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
     synchronized (partitionMap) {
       VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
@@ -161,22 +161,27 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
               dataInputOutput.getDataOutput());
         }
       } else {
-        VertexIdMessageIterator<I, M> iterator =
-            messages.getVertexIdMessageIterator();
-        while (iterator.hasNext()) {
-          iterator.next();
-          DataInputOutput dataInputOutput =
-              getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
-
-          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-              dataInputOutput.getDataOutput());
+        try {
+          VertexIdMessageIterator<I, M> iterator =
+              messages.getVertexIdMessageIterator();
+          while (iterator.hasNext()) {
+            iterator.next();
+            DataInputOutput dataInputOutput =
+                getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
+
+            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+                dataInputOutput.getDataOutput());
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("addPartitionMessages: IOException while" +
+              " adding message for a partition: " + e);
         }
       }
     }
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -193,7 +198,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) {
     DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
     if (dataInputOutput == null) {
       return EmptyIterable.get();
@@ -203,12 +208,12 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
+  public void clearVertexMessages(I vertexId) {
     getPartitionMap(vertexId).remove(vertexId);
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index 42fe992..4463ddb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -125,7 +125,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   @Override
   public void addPartitionMessages(
       int partitionId,
-      VertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) {
     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
     synchronized (partitionMap) {
       VertexIdMessageIterator<I, M>
@@ -152,7 +152,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -168,8 +168,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
-  public Iterable<M> getVertexMessages(
-      I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) {
     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
     if (!partitionMap.containsKey(vertexId)) {
       return EmptyIterable.get();
@@ -179,12 +178,12 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
+  public void clearVertexMessages(I vertexId) {
     getPartitionMap(vertexId).remove(vertexId);
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index 4c363f3..4ef9e76 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -127,8 +127,7 @@ public class IntByteArrayMessageStore<M extends Writable>
 
   @Override
   public void addPartitionMessages(int partitionId,
-      VertexIdMessages<IntWritable, M> messages) throws
-      IOException {
+      VertexIdMessages<IntWritable, M> messages) {
     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     synchronized (partitionMap) {
@@ -149,14 +148,19 @@ public class IntByteArrayMessageStore<M extends Writable>
               dataInputOutput.getDataOutput());
         }
       } else {
-        VertexIdMessageIterator<IntWritable, M>
-            iterator = messages.getVertexIdMessageIterator();
-        while (iterator.hasNext()) {
-          iterator.next();
-          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
-              iterator.getCurrentVertexId().get());
-          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-              dataInputOutput.getDataOutput());
+        try {
+          VertexIdMessageIterator<IntWritable, M>
+              iterator = messages.getVertexIdMessageIterator();
+          while (iterator.hasNext()) {
+            iterator.next();
+            DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+                iterator.getCurrentVertexId().get());
+            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+                dataInputOutput.getDataOutput());
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("addPartitionMessages: IOException while" +
+              " adding messages for a partition: " + e);
         }
       }
     }
@@ -167,7 +171,7 @@ public class IntByteArrayMessageStore<M extends Writable>
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -185,7 +189,7 @@ public class IntByteArrayMessageStore<M extends Writable>
 
   @Override
   public Iterable<M> getVertexMessages(
-      IntWritable vertexId) throws IOException {
+      IntWritable vertexId) {
     DataInputOutput dataInputOutput =
         getPartitionMap(vertexId).get(vertexId.get());
     if (dataInputOutput == null) {
@@ -196,12 +200,12 @@ public class IntByteArrayMessageStore<M extends Writable>
   }
 
   @Override
-  public void clearVertexMessages(IntWritable vertexId) throws IOException {
+  public void clearVertexMessages(IntWritable vertexId) {
     getPartitionMap(vertexId).remove(vertexId.get());
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 280f5b9..715bf45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -96,8 +96,7 @@ public class IntFloatMessageStore
 
   @Override
   public void addPartitionMessages(int partitionId,
-      VertexIdMessages<IntWritable, FloatWritable> messages) throws
-      IOException {
+      VertexIdMessages<IntWritable, FloatWritable> messages) {
     IntWritable reusableVertexId = new IntWritable();
     FloatWritable reusableMessage = new FloatWritable();
     FloatWritable reusableCurrentMessage = new FloatWritable();
@@ -128,7 +127,7 @@ public class IntFloatMessageStore
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -145,7 +144,7 @@ public class IntFloatMessageStore
 
   @Override
   public Iterable<FloatWritable> getVertexMessages(
-      IntWritable vertexId) throws IOException {
+      IntWritable vertexId) {
     Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
     if (!partitionMap.containsKey(vertexId.get())) {
       return EmptyIterable.get();
@@ -156,12 +155,12 @@ public class IntFloatMessageStore
   }
 
   @Override
-  public void clearVertexMessages(IntWritable vertexId) throws IOException {
+  public void clearVertexMessages(IntWritable vertexId) {
     getPartitionMap(vertexId).remove(vertexId.get());
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index d8a3fde..4fc4843 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -97,8 +97,7 @@ public class LongDoubleMessageStore
 
   @Override
   public void addPartitionMessages(int partitionId,
-      VertexIdMessages<LongWritable, DoubleWritable> messages) throws
-      IOException {
+      VertexIdMessages<LongWritable, DoubleWritable> messages) {
     LongWritable reusableVertexId = new LongWritable();
     DoubleWritable reusableMessage = new DoubleWritable();
     DoubleWritable reusableCurrentMessage = new DoubleWritable();
@@ -129,7 +128,7 @@ public class LongDoubleMessageStore
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -146,7 +145,7 @@ public class LongDoubleMessageStore
 
   @Override
   public Iterable<DoubleWritable> getVertexMessages(
-      LongWritable vertexId) throws IOException {
+      LongWritable vertexId) {
     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
     if (!partitionMap.containsKey(vertexId.get())) {
       return EmptyIterable.get();
@@ -157,12 +156,12 @@ public class LongDoubleMessageStore
   }
 
   @Override
-  public void clearVertexMessages(LongWritable vertexId) throws IOException {
+  public void clearVertexMessages(LongWritable vertexId) {
     getPartitionMap(vertexId).remove(vertexId.get());
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
index a0c977e..b3ed4b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -29,7 +29,6 @@ import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
@@ -91,7 +90,7 @@ public abstract class LongAbstractMessageStore<M extends Writable, T>
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     map.get(partitionId).clear();
   }
 
@@ -107,13 +106,13 @@ public abstract class LongAbstractMessageStore<M extends Writable, T>
   }
 
   @Override
-  public void clearVertexMessages(LongWritable vertexId) throws IOException {
+  public void clearVertexMessages(LongWritable vertexId) {
     getPartitionMap(vertexId).remove(vertexId.get());
   }
 
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     map.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
index 092d963..bcdab98 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
@@ -89,7 +89,7 @@ public class LongByteArrayMessageStore<M extends Writable>
 
   @Override
   public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) throws IOException {
+    VertexIdMessages<LongWritable, M> messages) {
     Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
     synchronized (partitionMap) {
       VertexIdMessageBytesIterator<LongWritable, M>
@@ -109,14 +109,19 @@ public class LongByteArrayMessageStore<M extends Writable>
               dataInputOutput.getDataOutput());
         }
       } else {
-        VertexIdMessageIterator<LongWritable, M>
-            iterator = messages.getVertexIdMessageIterator();
-        while (iterator.hasNext()) {
-          iterator.next();
-          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
-              iterator.getCurrentVertexId().get());
-          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
-              dataInputOutput.getDataOutput());
+        try {
+          VertexIdMessageIterator<LongWritable, M>
+              iterator = messages.getVertexIdMessageIterator();
+          while (iterator.hasNext()) {
+            iterator.next();
+            DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+                iterator.getCurrentVertexId().get());
+            VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+                dataInputOutput.getDataOutput());
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("addPartitionMessages: IOException while" +
+              " adding messages for a partition: " + e);
         }
       }
     }
@@ -128,7 +133,7 @@ public class LongByteArrayMessageStore<M extends Writable>
 
   @Override
   public Iterable<M> getVertexMessages(
-    LongWritable vertexId) throws IOException {
+    LongWritable vertexId) {
     DataInputOutput dataInputOutput =
         getPartitionMap(vertexId).get(vertexId.get());
     if (dataInputOutput == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
index 32296ad..eef75ba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
@@ -80,32 +80,37 @@ public class LongPointerListMessageStore<M extends Writable>
 
   @Override
   public void addPartitionMessages(int partitionId,
-    VertexIdMessages<LongWritable, M> messages) throws IOException {
-    VertexIdMessageIterator<LongWritable, M> iterator =
-        messages.getVertexIdMessageIterator();
-    long pointer = 0;
-    LongArrayList list;
-    while (iterator.hasNext()) {
-      iterator.next();
-      M msg = iterator.getCurrentMessage();
-      list = getList(iterator);
-      if (iterator.isNewMessage()) {
-        IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
-        pointer = indexAndDataOut.getIndex();
-        pointer <<= 32;
-        ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
-        pointer += dataOutput.getPos();
-        msg.write(dataOutput);
-      }
-      synchronized (list) { // TODO - any better way?
-        list.add(pointer);
+    VertexIdMessages<LongWritable, M> messages) {
+    try {
+      VertexIdMessageIterator<LongWritable, M> iterator =
+          messages.getVertexIdMessageIterator();
+      long pointer = 0;
+      LongArrayList list;
+      while (iterator.hasNext()) {
+        iterator.next();
+        M msg = iterator.getCurrentMessage();
+        list = getList(iterator);
+        if (iterator.isNewMessage()) {
+          IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+          pointer = indexAndDataOut.getIndex();
+          pointer <<= 32;
+          ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+          pointer += dataOutput.getPos();
+          msg.write(dataOutput);
+        }
+        synchronized (list) { // TODO - any better way?
+          list.add(pointer);
+        }
       }
+    } catch (IOException e) {
+      throw new RuntimeException("addPartitionMessages: IOException while" +
+          " adding messages for a partition: " + e);
     }
   }
 
   @Override
   public Iterable<M> getVertexMessages(
-    LongWritable vertexId) throws IOException {
+    LongWritable vertexId) {
     LongArrayList list = getPartitionMap(vertexId).get(
         vertexId.get());
     if (list == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 04afba5..6273694 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -113,17 +113,17 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   }
 
   @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+  public Iterable<M> getVertexMessages(I vertexId) {
     return store.getVertexMessages(vertexId);
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
+  public void clearVertexMessages(I vertexId) {
     store.clearVertexMessages(vertexId);
   }
 
   @Override
-  public void clearAll() throws IOException {
+  public void clearAll() {
     try {
       for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
         queue.put(SHUTDOWN_QUEUE_MESSAGE);
@@ -147,7 +147,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
 
   @Override
   public void addPartitionMessages(
-      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+      int partitionId, VertexIdMessages<I, M> messages) {
     int hash = partition2Queue.get(partitionId);
     try {
       queues[hash].put(new PartitionMessage<>(partitionId, messages));
@@ -167,7 +167,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
+  public void clearPartition(int partitionId) {
     store.clearPartition(partitionId);
   }
 
@@ -232,7 +232,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
               return;
             }
           }
-        } catch (IOException | InterruptedException e) {
+        } catch (InterruptedException e) {
           LOG.error("MessageStoreQueueWorker.run: " + message, e);
           return;
         }