You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/06/26 01:22:34 UTC

[2/2] git commit: updated refs/heads/trunk to 8944567

GIRAPH-697: Clean up message stores (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/89445670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/89445670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/89445670

Branch: refs/heads/trunk
Commit: 89445670eee21e38c0dadde4753310adae14b964
Parents: f8a3c77
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Tue Jun 25 16:05:44 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Tue Jun 25 16:05:44 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/comm/ServerData.java |  23 +-
 .../giraph/comm/messages/BasicMessageStore.java |  67 ---
 .../ByteArrayMessagesPerVertexStore.java        |  77 +---
 .../comm/messages/DiskBackedMessageStore.java   | 418 -------------------
 .../DiskBackedMessageStoreByPartition.java      | 390 -----------------
 .../comm/messages/FlushableMessageStore.java    |  40 --
 .../messages/InMemoryMessageStoreFactory.java   |   4 +-
 .../giraph/comm/messages/MessageStore.java      |  75 +++-
 .../comm/messages/MessageStoreByPartition.java  |  82 ----
 .../comm/messages/MessageStoreFactory.java      |   6 +-
 .../giraph/comm/messages/MessagesIterable.java  |  56 +++
 .../comm/messages/OneMessagePerVertexStore.java |  65 +--
 .../messages/SequentialFileMessageStore.java    | 411 ------------------
 .../comm/messages/SimpleMessageStore.java       |  44 +-
 .../out_of_core/DiskBackedMessageStore.java     | 322 ++++++++++++++
 .../PartitionDiskBackedMessageStore.java        | 350 ++++++++++++++++
 .../out_of_core/SequentialFileMessageStore.java | 407 ++++++++++++++++++
 .../comm/messages/out_of_core/package-info.java |  21 +
 .../giraph/comm/messages/package-info.java      |   2 +-
 .../NettyWorkerClientRequestProcessor.java      |   4 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |  21 +-
 .../apache/giraph/graph/ComputeCallable.java    |   6 +-
 .../apache/giraph/graph/GraphTaskManager.java   |   6 +-
 .../apache/giraph/comm/RequestFailureTest.java  |   2 +-
 .../org/apache/giraph/comm/RequestTest.java     |   2 +-
 .../apache/giraph/comm/TestMessageStores.java   | 140 +++----
 27 files changed, 1335 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0f2758e..2a605d8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-697: Clean up message stores (majakabiljo)
+
   GIRAPH-696: Should be able to spill giraph metrics to a specified 
   directory on HDFS (claudio)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 788be53..affc260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -54,21 +54,18 @@ public class ServerData<I extends WritableComparable,
   /** Edge store for this worker. */
   private final EdgeStore<I, V, E> edgeStore;
   /** Message store factory */
-  private final
-  MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+  private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   messageStoreFactory;
   /**
    * Message store for incoming messages (messages which will be consumed
    * in the next super step)
    */
-  private volatile MessageStoreByPartition<I, Writable>
-  incomingMessageStore;
+  private volatile MessageStore<I, Writable> incomingMessageStore;
   /**
    * Message store for current messages (messages which we received in
    * previous super step and which will be consumed in current super step)
    */
-  private volatile MessageStoreByPartition<I, Writable>
-  currentMessageStore;
+  private volatile MessageStore<I, Writable> currentMessageStore;
   /**
    * Map of partition ids to incoming vertex mutations from other workers.
    * (Synchronized access to values)
@@ -95,7 +92,7 @@ public class ServerData<I extends WritableComparable,
   public ServerData(
       CentralizedServiceWorker<I, V, E> service,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+      MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
           messageStoreFactory,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
@@ -136,9 +133,8 @@ public class ServerData<I extends WritableComparable,
    * @param <M> Message data
    * @return Incoming message store
    */
-  public <M extends Writable> MessageStoreByPartition<I, M>
-  getIncomingMessageStore() {
-    return (MessageStoreByPartition<I, M>) incomingMessageStore;
+  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+    return (MessageStore<I, M>) incomingMessageStore;
   }
 
   /**
@@ -148,9 +144,8 @@ public class ServerData<I extends WritableComparable,
    * @param <M> Message data
    * @return Current message store
    */
-  public <M extends Writable> MessageStoreByPartition<I, M>
-  getCurrentMessageStore() {
-    return (MessageStoreByPartition<I, M>) currentMessageStore;
+  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+    return (MessageStore<I, M>) currentMessageStore;
   }
 
   /** Prepare for next super step */

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
deleted file mode 100644
index dcb6223..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Most basic message store with just add, get and clear operations
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface BasicMessageStore<I extends WritableComparable,
-    M extends Writable> extends Writable {
-  /**
-   * Adds messages from one message store to another
-   *
-   * @param messageStore Add the messages from this message store to this
-   *                     object
-   * @throws java.io.IOException
-   */
-  void addMessages(MessageStore<I, M> messageStore) throws IOException;
-
-  /**
-   * Gets messages for a vertex.  The lifetime of every message is only
-   * guaranteed until the iterator's next() method is called.  Do not re-use
-   * the messages.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Iterable of messages for a vertex id
-   * @throws IOException
-   */
-  Iterable<M> getVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears messages for a vertex.
-   *
-   * @param vertexId Vertex id for which we want to clear messages
-   * @throws IOException
-   */
-  void clearVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears all resources used by this store.
-   *
-   * @throws IOException
-   */
-  void clearAll() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 97c8a35..fecd7ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -24,7 +24,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
 import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.hadoop.io.Writable;
@@ -33,7 +32,6 @@ import org.apache.hadoop.io.WritableComparable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -128,33 +126,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     }
   }
 
-  /**
-   * Special iterable that recycles the message
-   */
-  private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
-    /**
-     * Constructor
-     *
-     * @param buf Buffer
-     * @param off Offset to start in the buffer
-     * @param length Length of the buffer
-     */
-    private MessagesIterable(byte[] buf, int off, int length) {
-      super(config, buf, off, length);
-    }
-
-    @Override
-    protected M createWritable() {
-      return ReflectionUtils.newInstance(messageClass);
-    }
-  }
-
   @Override
   protected Iterable<M> getMessagesAsIterable(
       ExtendedDataOutput extendedDataOutput) {
-
-    return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
-        extendedDataOutput.getPos());
+    return new MessagesIterable<M>(config, messageClass,
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
   }
 
   /**
@@ -209,9 +185,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     int byteArraySize = in.readInt();
     byte[] messages = new byte[byteArraySize];
     in.readFully(messages);
-    ExtendedDataOutput extendedDataOutput =
-        config.createExtendedDataOutput(messages, 0);
-    return extendedDataOutput;
+    return config.createExtendedDataOutput(messages, 0);
   }
 
   /**
@@ -224,48 +198,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @return Factory
    */
   public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
       CentralizedServiceWorker<I, ?, ?> service,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
   }
 
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
-      ByteArrayMessagesPerVertexStore<I, M>
-          byteArrayMessagesPerVertexStore =
-          (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
-      for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
-           partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
-        for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
-            partitionEntry.getValue().entrySet()) {
-          ConcurrentMap<I, ExtendedDataOutput> partitionMap =
-              getOrCreatePartitionMap(partitionEntry.getKey());
-          ExtendedDataOutput extendedDataOutput =
-              partitionMap.get(vertexEntry.getKey());
-          if (extendedDataOutput == null) {
-            ExtendedDataOutput newExtendedDataOutput =
-                config.createExtendedDataOutput();
-            extendedDataOutput =
-                partitionMap.putIfAbsent(vertexEntry.getKey(),
-                    newExtendedDataOutput);
-            if (extendedDataOutput == null) {
-              extendedDataOutput = newExtendedDataOutput;
-            }
-          }
-
-          // Add the messages
-          extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
-              vertexEntry.getValue().getPos());
-        }
-      }
-    } else {
-      throw new IllegalArgumentException("addMessages: Illegal argument " +
-          messageStore.getClass());
-    }
-  }
-
   /**
    * Factory for {@link ByteArrayMessagesPerVertexStore}
    *
@@ -273,7 +211,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @param <M> Message data
    */
   private static class Factory<I extends WritableComparable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
@@ -290,8 +228,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
-      return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
+      return new ByteArrayMessagesPerVertexStore<I, M>(
+          messageClass, service, config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
deleted file mode 100644
index 2712edd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
-    M extends Writable> implements FlushableMessageStore<I, M> {
-  /** Message class */
-  private final Class<M> messageClass;
-  /**
-   * In-memory message map (must be sorted to insure that the ids are
-   * ordered)
-   */
-  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
-  inMemoryMessages;
-  /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Counter for number of messages in-memory */
-  private final AtomicInteger numberOfMessagesInMemory;
-  /** To keep vertex ids which we have messages for */
-  private final Set<I> destinationVertices;
-  /** File stores in which we keep flushed messages */
-  private final Collection<BasicMessageStore<I, M>> fileStores;
-  /** Factory for creating file stores when flushing */
-  private final
-  MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-  /** Lock for disk flushing */
-  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
-  /**
-   * Constructor.
-   *
-   * @param messageClass     Message class held in the store
-   * @param config           Hadoop configuration
-   * @param fileStoreFactory Factory for creating file stores when flushing
-   */
-  public DiskBackedMessageStore(
-      Class<M> messageClass,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
-    this.messageClass = messageClass;
-    this.config = config;
-    numberOfMessagesInMemory = new AtomicInteger(0);
-    destinationVertices =
-        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
-    fileStores = Lists.newArrayList();
-    this.fileStoreFactory = fileStoreFactory;
-  }
-
-  /**
-   * Add vertex messages
-   *
-   * @param vertexId Vertex id to use
-   * @param messages Messages to add (note that the lifetime of the messages)
-   *                 is only until next() is called again)
-   * @return True if the vertex id ownership is taken by this method,
-   *         false otherwise
-   * @throws IOException
-   */
-  boolean addVertexMessages(I vertexId,
-                            Iterable<M> messages) throws IOException {
-    boolean ownsVertexId = false;
-    destinationVertices.add(vertexId);
-    rwLock.readLock().lock();
-    try {
-      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-      if (extendedDataOutput == null) {
-        ExtendedDataOutput newExtendedDataOutput =
-            config.createExtendedDataOutput();
-        extendedDataOutput =
-            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
-        if (extendedDataOutput == null) {
-          ownsVertexId = true;
-          extendedDataOutput = newExtendedDataOutput;
-        }
-      }
-
-      synchronized (extendedDataOutput) {
-        for (M message : messages) {
-          message.write(extendedDataOutput);
-          numberOfMessagesInMemory.getAndIncrement();
-        }
-      }
-    } finally {
-      rwLock.readLock().unlock();
-    }
-
-    return ownsVertexId;
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws
-      IOException {
-    for (I destinationVertex : messageStore.getDestinationVertices()) {
-      addVertexMessages(destinationVertex,
-          messageStore.getVertexMessages(destinationVertex));
-    }
-  }
-
-  /**
-   * Special iterable that recycles the message
-   */
-  private class MessageIterable extends RepresentativeByteArrayIterable<M> {
-    /**
-     * Constructor
-     *
-     * @param buf Buffer
-     * @param off Offset to start in the buffer
-     * @param length Length of the buffer
-     */
-    public MessageIterable(
-        byte[] buf, int off, int length) {
-      super(config, buf, off, length);
-    }
-
-    @Override
-    protected M createWritable() {
-      return ReflectionUtils.newInstance(messageClass);
-    }
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-    if (extendedDataOutput == null) {
-      extendedDataOutput = config.createExtendedDataOutput();
-    }
-    Iterable<M> combinedIterable = new MessageIterable(
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      combinedIterable = Iterables.concat(combinedIterable,
-          fileStore.getVertexMessages(vertexId));
-    }
-    return combinedIterable;
-  }
-
-  @Override
-  public int getNumberOfMessages() {
-    return numberOfMessagesInMemory.get();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    return destinationVertices.contains(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    return destinationVertices;
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    inMemoryMessages.remove(vertexId);
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    inMemoryMessages.clear();
-    destinationVertices.clear();
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      fileStore.clearAll();
-    }
-    fileStores.clear();
-  }
-
-  /**
-   * Special temporary message store for passing along in-memory messages
-   */
-  private class TemporaryMessageStore implements MessageStore<I, M> {
-    /**
-     * In-memory message map (must be sorted to insure that the ids are
-     * ordered)
-     */
-    private final ConcurrentNavigableMap<I, ExtendedDataOutput>
-    temporaryMessages;
-
-    /**
-     * Constructor.
-     *
-     * @param temporaryMessages Messages to be owned by this object
-     */
-    private TemporaryMessageStore(
-        ConcurrentNavigableMap<I, ExtendedDataOutput>
-            temporaryMessages) {
-      this.temporaryMessages = temporaryMessages;
-    }
-
-    @Override
-    public int getNumberOfMessages() {
-      throw new IllegalAccessError("getNumberOfMessages: Not supported");
-    }
-
-    @Override
-    public boolean hasMessagesForVertex(I vertexId) {
-      return temporaryMessages.containsKey(vertexId);
-    }
-
-    @Override
-    public Iterable<I> getDestinationVertices() {
-      return temporaryMessages.keySet();
-    }
-
-    @Override
-    public void addMessages(MessageStore<I, M> messageStore)
-      throws IOException {
-      throw new IllegalAccessError("addMessages: Not supported");
-    }
-
-    @Override
-    public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-      ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
-      if (extendedDataOutput == null) {
-        extendedDataOutput = config.createExtendedDataOutput();
-      }
-      return new MessageIterable(extendedDataOutput.getByteArray(), 0,
-          extendedDataOutput.getPos());
-    }
-
-    @Override
-    public void clearVertexMessages(I vertexId) throws IOException {
-      temporaryMessages.remove(vertexId);
-    }
-
-    @Override
-    public void clearAll() throws IOException {
-      temporaryMessages.clear();
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-      throw new IllegalAccessError("write: Not supported");
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-      throw new IllegalAccessError("readFields: Not supported");
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
-    rwLock.writeLock().lock();
-    try {
-      messagesToFlush = inMemoryMessages;
-      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
-      numberOfMessagesInMemory.set(0);
-    } finally {
-      rwLock.writeLock().unlock();
-    }
-    BasicMessageStore<I, M> fileStore =
-        fileStoreFactory.newStore(messageClass);
-    fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
-
-    synchronized (fileStores) {
-      fileStores.add(fileStore);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // write destination vertices
-    out.writeInt(destinationVertices.size());
-    for (I vertexId : destinationVertices) {
-      vertexId.write(out);
-    }
-
-    // write of in-memory messages
-    out.writeInt(numberOfMessagesInMemory.get());
-
-    // write in-memory messages map
-    out.writeInt(inMemoryMessages.size());
-    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
-      entry.getKey().write(out);
-      out.writeInt(entry.getValue().getPos());
-      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
-    }
-
-    // write file stores
-    out.writeInt(fileStores.size());
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      fileStore.write(out);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    // read destination vertices
-    int numVertices = in.readInt();
-    for (int v = 0; v < numVertices; v++) {
-      I vertexId = (I) config.createVertexId();
-      vertexId.readFields(in);
-      destinationVertices.add(vertexId);
-    }
-
-    // read in-memory messages
-    numberOfMessagesInMemory.set(in.readInt());
-
-    // read in-memory map
-    int mapSize = in.readInt();
-    for (int m = 0; m < mapSize; m++) {
-      I vertexId = config.createVertexId();
-      vertexId.readFields(in);
-      int messageBytes = in.readInt();
-      byte[] buf = new byte[messageBytes];
-      ExtendedDataOutput extendedDataOutput =
-          config.createExtendedDataOutput(buf, messageBytes);
-      inMemoryMessages.put(vertexId, extendedDataOutput);
-    }
-
-    // read file stores
-    int numFileStores = in.readInt();
-    for (int s = 0; s < numFileStores; s++) {
-      BasicMessageStore<I, M> fileStore =
-          fileStoreFactory.newStore(messageClass);
-      fileStore.readFields(in);
-      fileStores.add(fileStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config           Hadoop configuration
-   * @param fileStoreFactory Factory for creating message stores for
-   *                         partitions
-   * @param <I>              Vertex id
-   * @param <M>              Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-    return new Factory<I, M>(config, fileStoreFactory);
-  }
-
-  /**
-   * Factory for {@link DiskBackedMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable> implements MessageStoreFactory<I, M,
-      FlushableMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration config;
-    /** Factory for creating message stores for partitions */
-    private final
-    MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-
-    /**
-     * @param config           Hadoop configuration
-     * @param fileStoreFactory Factory for creating message stores for
-     *                         partitions
-     */
-    public Factory(ImmutableClassesGiraphConfiguration config,
-        MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-      this.config = config;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
-      return new DiskBackedMessageStore<I, M>(messageClass, config,
-          fileStoreFactory);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
deleted file mode 100644
index 4a28949..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MessageStoreByPartition<I, M> {
-  /** Message class */
-  private final Class<M> messageClass;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, V, E> service;
-  /** Number of messages to keep in memory */
-  private final int maxNumberOfMessagesInMemory;
-  /** Factory for creating file stores when flushing */
-  private final
-  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-  /** Map from partition id to its message store */
-  private final
-  ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
-  /**
-   * @param messageClass                Message class held in the store
-   * @param service                     Service worker
-   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory            Factory for creating file stores
-   *                                    when flushing
-   */
-  public DiskBackedMessageStoreByPartition(
-      Class<M> messageClass,
-      CentralizedServiceWorker<I, V, E> service,
-      int maxNumberOfMessagesInMemory,
-      MessageStoreFactory<I, M, FlushableMessageStore<I,
-          M>> fileStoreFactory) {
-    this.messageClass = messageClass;
-    this.service = service;
-    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
-    this.fileStoreFactory = fileStoreFactory;
-    partitionMessageStores = Maps.newConcurrentMap();
-  }
-
-  @Override
-  public void addPartitionMessages(
-      int partitionId,
-      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
-    FlushableMessageStore<I, M> flushableMessageStore =
-        getMessageStore(partitionId);
-    if (flushableMessageStore instanceof DiskBackedMessageStore) {
-      DiskBackedMessageStore<I, M> diskBackedMessageStore =
-          (DiskBackedMessageStore<I, M>) flushableMessageStore;
-      ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
-          vertexIdMessageIterator =
-          messages.getVertexIdMessageIterator();
-      while (vertexIdMessageIterator.hasNext()) {
-        vertexIdMessageIterator.next();
-        boolean ownsVertexId =
-            diskBackedMessageStore.addVertexMessages(
-                vertexIdMessageIterator.getCurrentVertexId(),
-                Collections.singleton(
-                    vertexIdMessageIterator.getCurrentMessage()));
-        if (ownsVertexId) {
-          vertexIdMessageIterator.releaseCurrentVertexId();
-        }
-      }
-    } else {
-      throw new IllegalStateException("addPartitionMessages: Doesn't support " +
-          "class " + flushableMessageStore.getClass());
-    }
-    checkMemory();
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    for (I destinationVertex : messageStore.getDestinationVertices()) {
-      FlushableMessageStore<I, M> flushableMessageStore =
-          getMessageStore(destinationVertex);
-      if (flushableMessageStore instanceof DiskBackedMessageStore) {
-        DiskBackedMessageStore<I, M> diskBackedMessageStore =
-            (DiskBackedMessageStore<I, M>) flushableMessageStore;
-        Iterable<M> messages =
-            messageStore.getVertexMessages(destinationVertex);
-        diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
-      } else {
-        throw new IllegalStateException("addMessages: Doesn't support " +
-            "class " + flushableMessageStore.getClass());
-      }
-    }
-    checkMemory();
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      return getMessageStore(vertexId).getVertexMessages(vertexId);
-    } else {
-      return EmptyIterable.get();
-    }
-  }
-
-  @Override
-  public int getNumberOfMessages() {
-    int numOfMessages = 0;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      numOfMessages += messageStore.getNumberOfMessages();
-    }
-    return numOfMessages;
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    List<I> vertices = Lists.newArrayList();
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      Iterables.addAll(vertices, messageStore.getDestinationVertices());
-    }
-    return vertices;
-  }
-
-  @Override
-  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore == null) {
-      return Collections.emptyList();
-    } else {
-      return messageStore.getDestinationVertices();
-    }
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      getMessageStore(vertexId).clearVertexMessages(vertexId);
-    }
-  }
-
-  @Override
-  public void clearPartition(int partitionId) throws IOException {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      messageStore.clearAll();
-    }
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      messageStore.clearAll();
-    }
-    partitionMessageStores.clear();
-  }
-
-  /**
-   * Checks the memory status, flushes if necessary
-   *
-   * @throws IOException
-   */
-  private void checkMemory() throws IOException {
-    while (memoryFull()) {
-      flushOnePartition();
-    }
-  }
-
-  /**
-   * Check if memory is full
-   *
-   * @return True iff memory is full
-   */
-  private boolean memoryFull() {
-    int totalMessages = 0;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      totalMessages += messageStore.getNumberOfMessages();
-    }
-    return totalMessages > maxNumberOfMessagesInMemory;
-  }
-
-  /**
-   * Finds biggest partition and flushes it to the disk
-   *
-   * @throws IOException
-   */
-  private void flushOnePartition() throws IOException {
-    int maxMessages = 0;
-    FlushableMessageStore<I, M> biggestStore = null;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      int numMessages = messageStore.getNumberOfMessages();
-      if (numMessages > maxMessages) {
-        maxMessages = numMessages;
-        biggestStore = messageStore;
-      }
-    }
-    if (biggestStore != null) {
-      biggestStore.flush();
-    }
-  }
-
-  /**
-   * Get message store for partition which holds vertex with required vertex
-   * id
-   *
-   * @param vertexId Id of vertex for which we are asking for message store
-   * @return Requested message store
-   */
-  private FlushableMessageStore<I, M> getMessageStore(I vertexId) {
-    int partitionId =
-        service.getVertexPartitionOwner(vertexId).getPartitionId();
-    return getMessageStore(partitionId);
-  }
-
-  /**
-   * Get message store for partition id. It it doesn't exist yet,
-   * creates a new one.
-   *
-   * @param partitionId Id of partition for which we are asking for message
-   *                    store
-   * @return Requested message store
-   */
-  private FlushableMessageStore<I, M> getMessageStore(int partitionId) {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      return messageStore;
-    }
-    messageStore = fileStoreFactory.newStore(messageClass);
-    FlushableMessageStore<I, M> store =
-        partitionMessageStores.putIfAbsent(partitionId, messageStore);
-    return (store == null) ? messageStore : store;
-  }
-
-  @Override
-  public void writePartition(DataOutput out,
-      int partitionId) throws IOException {
-    FlushableMessageStore<I, M> partitionStore =
-        partitionMessageStores.get(partitionId);
-    out.writeBoolean(partitionStore != null);
-    if (partitionStore != null) {
-      partitionStore.write(out);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(partitionMessageStores.size());
-    for (Entry<Integer, FlushableMessageStore<I, M>> entry :
-        partitionMessageStores.entrySet()) {
-      out.writeInt(entry.getKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException {
-    if (in.readBoolean()) {
-      FlushableMessageStore<I, M> messageStore =
-          fileStoreFactory.newStore(messageClass);
-      messageStore.readFields(in);
-      partitionMessageStores.put(partitionId, messageStore);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int numStores = in.readInt();
-    for (int s = 0; s < numStores; s++) {
-      int partitionId = in.readInt();
-      FlushableMessageStore<I, M> messageStore =
-          fileStoreFactory.newStore(messageClass);
-      messageStore.readFields(in);
-      partitionMessageStores.put(partitionId, messageStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param service             Service worker
-   * @param maxMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory    Factory for creating file stores when
-   *                            flushing
-   * @param <I>                 Vertex id
-   * @param <V>                 Vertex data
-   * @param <E>                 Edge data
-   * @param <M>                 Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E> service,
-      int maxMessagesInMemory,
-      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
-          fileStoreFactory) {
-    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
-        fileStoreFactory);
-  }
-
-  /**
-   * Factory for {@link DiskBackedMessageStoreByPartition}
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
-    /** Service worker */
-    private final CentralizedServiceWorker<I, V, E> service;
-    /** Number of messages to keep in memory */
-    private final int maxMessagesInMemory;
-    /** Factory for creating file stores when flushing */
-    private final
-    MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-
-    /**
-     * @param service             Service worker
-     * @param maxMessagesInMemory Number of messages to keep in memory
-     * @param fileStoreFactory    Factory for creating file stores when
-     *                            flushing
-     */
-    public Factory(CentralizedServiceWorker<I, V, E> service,
-        int maxMessagesInMemory,
-        MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
-            fileStoreFactory) {
-      this.service = service;
-      this.maxMessagesInMemory = maxMessagesInMemory;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
-      return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
-          service, maxMessagesInMemory, fileStoreFactory);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
deleted file mode 100644
index 6e7fe55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Message stores which has flush operation
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface FlushableMessageStore<I extends WritableComparable,
-    M extends Writable> extends MessageStore<I, M> {
-  /**
-   * Flushes messages to the disk.
-   *
-   * @throws IOException
-   */
-  void flush() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 9086d78..ba8a005 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
  */
 public class InMemoryMessageStoreFactory<I extends WritableComparable,
     M extends Writable>
-    implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    implements MessageStoreFactory<I, M, MessageStore<I, M>> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(InMemoryMessageStoreFactory.class);
@@ -56,7 +56,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   }
 
   @Override
-  public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+  public MessageStore<I, M> newStore(Class<M> messageClass) {
     if (conf.useCombiner()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("newStore: " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index a6f174d..2af7642 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -28,13 +32,32 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <M> Message data
  */
 public interface MessageStore<I extends WritableComparable,
-    M extends Writable> extends BasicMessageStore<I, M> {
+    M extends Writable> {
   /**
-   * Get number of messages in memory
+   * Gets messages for a vertex.  The lifetime of every message is only
+   * guaranteed until the iterator's next() method is called. Do not hold
+   * references to objects returned by this iterator.
    *
-   * @return Number of messages in memory
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Iterable of messages for a vertex id
+   * @throws java.io.IOException
    */
-  int getNumberOfMessages();
+  Iterable<M> getVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears all resources used by this store.
+   *
+   * @throws IOException
+   */
+  void clearAll() throws IOException;
 
   /**
    * Check if we have messages for some vertex
@@ -45,9 +68,49 @@ public interface MessageStore<I extends WritableComparable,
   boolean hasMessagesForVertex(I vertexId);
 
   /**
-   * Gets vertex ids which we have messages for
+   * Adds messages for partition
+   *
+   * @param partitionId Id of partition
+   * @param messages    Collection of vertex ids and messages we want to add
+   * @throws IOException
+   */
+  void addPartitionMessages(
+      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+    throws IOException;
+
+  /**
+   * Gets vertex ids from selected partition which we have messages for
    *
+   * @param partitionId Id of partition
    * @return Iterable over vertex ids which we have messages for
    */
-  Iterable<I> getDestinationVertices();
+  Iterable<I> getPartitionDestinationVertices(int partitionId);
+
+  /**
+   * Clears messages for a partition.
+   *
+   * @param partitionId Partition id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearPartition(int partitionId) throws IOException;
+
+  /**
+   * Serialize messages for one partition.
+   *
+   * @param out         {@link DataOutput} to serialize this object into
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void writePartition(DataOutput out, int partitionId) throws IOException;
+
+  /**
+   * Deserialize messages for one partition
+   *
+   * @param in          {@link DataInput} to deserialize this object
+   *                    from.
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
deleted file mode 100644
index d2143c6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store which stores data by partition
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface MessageStoreByPartition<I extends WritableComparable,
-    M extends Writable> extends MessageStore<I, M> {
-  /**
-   * Adds messages for partition
-   *
-   * @param partitionId Id of partition
-   * @param messages    Collection of vertex ids and messages we want to add
-   * @throws IOException
-   */
-  void addPartitionMessages(
-      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
-    throws IOException;
-
-  /**
-   * Gets vertex ids from selected partition which we have messages for
-   *
-   * @param partitionId Id of partition
-   * @return Iterable over vertex ids which we have messages for
-   */
-  Iterable<I> getPartitionDestinationVertices(int partitionId);
-
-  /**
-   * Clears messages for a partition.
-   *
-   * @param partitionId Partition id for which we want to clear messages
-   * @throws IOException
-   */
-  void clearPartition(int partitionId) throws IOException;
-
-  /**
-   * Serialize messages for one partition.
-   *
-   * @param out         {@link DataOutput} to serialize this object into
-   * @param partitionId Id of partition
-   * @throws IOException
-   */
-  void writePartition(DataOutput out, int partitionId) throws IOException;
-
-  /**
-   * Deserialize messages for one partition
-   *
-   * @param in          {@link DataInput} to deserialize this object
-   *                    from.
-   * @param partitionId Id of partition
-   * @throws IOException
-   */
-  void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index dec9a92..aa6a63e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.io.WritableComparable;
  *
  * @param <I> Vertex id
  * @param <M> Message data
- * @param <S> Message store
+ * @param <MS> Message store
  */
 public interface MessageStoreFactory<I extends WritableComparable,
-    M extends Writable, S extends BasicMessageStore<I, M>> {
+    M extends Writable, MS> {
   /**
    * Creates new message store.
    *
@@ -39,5 +39,5 @@ public interface MessageStoreFactory<I extends WritableComparable,
    * @param messageClass Message class held in the store
    * @return New message store
    */
-  S newStore(Class<M> messageClass);
+  MS newStore(Class<M> messageClass);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
new file mode 100644
index 0000000..970d76d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special iterable that recycles the message
+ *
+ * @param <M> Message data
+ */
+public class MessagesIterable<M extends Writable>
+    extends RepresentativeByteArrayIterable<M> {
+  /** Message class */
+  private final Class<M> messageClass;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param messageClass Message class
+   * @param buf Buffer
+   * @param off Offset to start in the buffer
+   * @param length Length of the buffer
+   */
+  public MessagesIterable(
+      ImmutableClassesGiraphConfiguration conf, Class<M> messageClass,
+      byte[] buf, int off, int length) {
+    super(conf, buf, off, length);
+    this.messageClass = messageClass;
+  }
+
+  @Override
+  protected M createWritable() {
+    return ReflectionUtils.newInstance(messageClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 8710dac..f18af5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -22,7 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -60,44 +59,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     this.combiner = combiner;
   }
 
-  /**
-   * If there is already a message related to the vertex id in the
-   * partition map return that message, otherwise create a new one,
-   * put it in the map and return it
-   *
-   * @param vertexId Id of vertex
-   * @param partitionMap Partition map
-   * @return Message for this vertex
-   */
-  private M getOrCreateCurrentMessage(I vertexId,
-      ConcurrentMap<I, M> partitionMap) {
-    M currentMessage = partitionMap.get(vertexId);
-    if (currentMessage == null) {
-      M newMessage = combiner.createInitialMessage();
-      currentMessage = partitionMap.putIfAbsent(vertexId, newMessage);
-      if (currentMessage == null) {
-        currentMessage = newMessage;
-      }
-    }
-    return currentMessage;
-  }
-
-  /**
-   * Add a single message for vertex to a partition map
-   *
-   * @param vertexId Id of vertex which received message
-   * @param message Message to add
-   * @param partitionMap Partition map to add the message to
-   * @throws IOException
-   */
-  private void addVertexMessageToPartition(I vertexId, M message,
-      ConcurrentMap<I, M> partitionMap) throws IOException {
-    M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
-    synchronized (currentMessage) {
-      combiner.combine(vertexId, currentMessage, message);
-    }
-  }
-
   @Override
   public void addPartitionMessages(
       int partitionId,
@@ -149,26 +110,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     return message;
   }
 
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    if (messageStore instanceof OneMessagePerVertexStore) {
-      OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
-          (OneMessagePerVertexStore<I, M>) messageStore;
-      for (Map.Entry<Integer, ConcurrentMap<I, M>>
-          partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
-        ConcurrentMap<I, M> partitionMap =
-              getOrCreatePartitionMap(partitionEntry.getKey());
-        for (Map.Entry<I, M> vertexEntry :
-            partitionEntry.getValue().entrySet()) {
-          addVertexMessageToPartition(vertexEntry.getKey(),
-              vertexEntry.getValue(), partitionMap);
-        }
-      }
-    } else {
-      throw new IllegalArgumentException("addMessages: Illegal argument " +
-          messageStore.getClass());
-    }
-  }
 
   /**
    * Create new factory for this message store
@@ -180,7 +121,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
    * @return Factory
    */
   public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
       CentralizedServiceWorker<I, ?, ?> service,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
@@ -194,7 +135,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
    */
   private static class Factory<I extends WritableComparable,
       M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
@@ -211,7 +152,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
       return new OneMessagePerVertexStore<I, M>(messageClass, service,
           config.<M>createCombiner(), config);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
deleted file mode 100644
index 23bbbc5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(MessageStore)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
-    M extends Writable> implements BasicMessageStore<I, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SequentialFileMessageStore.class);
-  /** Message class */
-  private final Class<M> messageClass;
-  /** File in which we store data */
-  private final File file;
-  /** Configuration which we need for reading data */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Buffer size to use when reading and writing files */
-  private final int bufferSize;
-  /** File input stream */
-  private DataInputStream in;
-  /** How many vertices do we have left to read in the file */
-  private int verticesLeft;
-  /** Id of currently read vertex */
-  private I currentVertexId;
-
-  /**
-   * Stores message on the disk.
-   *
-   * @param messageClass Message class held in the store
-   * @param config       Configuration used later for reading
-   * @param bufferSize   Buffer size to use when reading and writing
-   * @param fileName     File in which we want to store messages
-   * @throws IOException
-   */
-  public SequentialFileMessageStore(
-      Class<M> messageClass,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      int bufferSize,
-      String fileName) {
-    this.messageClass = messageClass;
-    this.config = config;
-    this.bufferSize = bufferSize;
-    file = new File(fileName);
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    // Writes messages to its file
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("addMessages: Deleting " + file);
-      }
-      file.delete();
-    }
-    file.createNewFile();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addMessages: Creating " + file);
-    }
-
-    DataOutputStream out = null;
-
-    try {
-      out = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
-      int destinationVertexIdCount =
-          Iterables.size(messageStore.getDestinationVertices());
-      out.writeInt(destinationVertexIdCount);
-
-      // Since the message store messages might not be sorted, sort them if
-      // necessary
-      SortedSet<I> sortedSet;
-      if (messageStore.getDestinationVertices() instanceof SortedSet) {
-        sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
-      } else {
-        sortedSet =
-            Sets.newTreeSet(messageStore.getDestinationVertices());
-        for (I destinationVertexId : messageStore.getDestinationVertices()) {
-          sortedSet.add(destinationVertexId);
-        }
-      }
-
-      // Dump the vertices and their messages in a sorted order
-      for (I destinationVertexId : sortedSet) {
-        destinationVertexId.write(out);
-        Iterable<M> messages =
-            messageStore.getVertexMessages(destinationVertexId);
-        int messageCount = Iterables.size(messages);
-        out.writeInt(messageCount);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("addMessages: For vertex id " + destinationVertexId +
-              ", messages = " + messageCount + " to file " + file);
-        }
-        for (M message : messages) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("addMessages: Wrote " + message + " to " + file);
-          }
-          message.write(out);
-        }
-      }
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-  }
-
-  /**
-   * Reads messages for a vertex. It will find the messages only if all
-   * previous reads used smaller vertex ids than this one - messages should
-   * be retrieved in increasing order of vertex ids.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Messages for the selected vertex, or empty list if not used
-   *         correctly
-   * @throws IOException
-   */
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws
-      IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
-          " (currently " + currentVertexId + ") from " + file);
-    }
-    if (in == null) {
-      startReading();
-    }
-
-    I nextVertexId = getCurrentVertexId();
-    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
-      nextVertexId = getNextVertexId();
-    }
-
-    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
-      return EmptyIterable.get();
-    }
-
-    return readMessagesForCurrentVertex();
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException { }
-
-  @Override
-  public void clearAll() throws IOException {
-    endReading();
-    file.delete();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(file.length());
-    FileInputStream input = new FileInputStream(file);
-    try {
-      byte[] buffer = new byte[bufferSize];
-      while (true) {
-        int length = input.read(buffer);
-        if (length < 0) {
-          break;
-        }
-        out.write(buffer, 0, length);
-      }
-    } finally {
-      input.close();
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    FileOutputStream output = new FileOutputStream(file);
-    try {
-      long fileLength = in.readLong();
-      byte[] buffer = new byte[bufferSize];
-      for (long position = 0; position < fileLength; position += bufferSize) {
-        int bytes = (int) Math.min(bufferSize, fileLength - position);
-        in.readFully(buffer, 0, bytes);
-        output.write(buffer);
-      }
-    } finally {
-      output.close();
-    }
-  }
-
-  /**
-   * Prepare for reading
-   *
-   * @throws IOException
-   */
-  private void startReading() throws IOException {
-    currentVertexId = null;
-    in = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file), bufferSize));
-    verticesLeft = in.readInt();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("startReading: File " + file + " with " +
-          verticesLeft + " vertices left");
-    }
-  }
-
-  /**
-   * Gets current vertex id.
-   * <p/>
-   * If there is a vertex id whose messages haven't been read yet it
-   * will return that vertex id, otherwise it will read and return the next
-   * one.
-   *
-   * @return Current vertex id
-   * @throws IOException
-   */
-  private I getCurrentVertexId() throws IOException {
-    if (currentVertexId != null) {
-      return currentVertexId;
-    } else {
-      return getNextVertexId();
-    }
-  }
-
-  /**
-   * Gets next vertex id.
-   * <p/>
-   * If there is a vertex whose messages haven't been read yet it
-   * will read and skip over its messages to get to the next vertex.
-   *
-   * @return Next vertex id
-   * @throws IOException
-   */
-  private I getNextVertexId() throws IOException {
-    if (currentVertexId != null) {
-      readMessagesForCurrentVertex();
-    }
-    if (verticesLeft == 0) {
-      return null;
-    }
-    currentVertexId = config.createVertexId();
-    currentVertexId.readFields(in);
-    return currentVertexId;
-  }
-
-  /**
-   * Reads messages for current vertex.
-   *
-   * @return Messages for current vertex
-   * @throws IOException
-   */
-  private Collection<M> readMessagesForCurrentVertex() throws IOException {
-    int messagesSize = in.readInt();
-    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
-    for (int i = 0; i < messagesSize; i++) {
-      M message = ReflectionUtils.newInstance(messageClass);
-      try {
-        message.readFields(in);
-      } catch (IOException e) {
-        throw new IllegalStateException("readMessagesForCurrentVertex: " +
-            "Failed to read message from " + i + " of " +
-            messagesSize + " for vertex id " + currentVertexId + " from " +
-            file, e);
-      }
-      messages.add(message);
-    }
-    currentVertexDone();
-    return messages;
-  }
-
-  /**
-   * Release current vertex.
-   *
-   * @throws IOException
-   */
-  private void currentVertexDone() throws IOException {
-    currentVertexId = null;
-    verticesLeft--;
-    if (verticesLeft == 0) {
-      endReading();
-    }
-  }
-
-  /**
-   * Call when we are done reading, for closing files.
-   *
-   * @throws IOException
-   */
-  private void endReading() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("endReading: Stopped reading " + file);
-    }
-    if (in != null) {
-      in.close();
-      in = null;
-    }
-  }
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config Hadoop configuration
-   * @param <I>    Vertex id
-   * @param <M>    Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration config) {
-    return new Factory<I, M>(config);
-  }
-
-  /**
-   * Factory for {@link SequentialFileMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable>
-      implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration config;
-    /** Directories in which we'll keep necessary files */
-    private final String[] directories;
-    /** Buffer size to use when reading and writing */
-    private final int bufferSize;
-    /** Counter for created message stores */
-    private final AtomicInteger storeCounter;
-
-    /**
-     * Constructor.
-     *
-     * @param config Hadoop configuration
-     */
-    public Factory(ImmutableClassesGiraphConfiguration config) {
-      this.config = config;
-      String jobId = config.get("mapred.job.id", "Unknown Job");
-      int taskId   = config.getTaskPartition();
-      List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
-      Collections.shuffle(userPaths);
-      directories = new String[userPaths.size()];
-      int i = 0;
-      for (String path : userPaths) {
-        String directory = path + File.separator + jobId + File.separator +
-            taskId + File.separator;
-        directories[i++] = directory;
-        new File(directory).mkdirs();
-      }
-      this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
-      storeCounter = new AtomicInteger();
-    }
-
-    @Override
-    public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
-      int idx = Math.abs(storeCounter.getAndIncrement());
-      String fileName =
-          directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(messageClass, config,
-          bufferSize, fileName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 1a91dfb..c9d869c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -18,14 +18,12 @@
 
 package org.apache.giraph.comm.messages;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -34,7 +32,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
- * Abstract class for {@link MessageStoreByPartition} which allows any kind
+ * Abstract class for {@link MessageStore} which allows any kind
  * of object to hold messages for one vertex.
  * Simple in memory message store implemented with a two level concurrent
  * hash map.
@@ -44,7 +42,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <T> Type of object which holds messages for one vertex
  */
 public abstract class SimpleMessageStore<I extends WritableComparable,
-    M extends Writable, T> implements MessageStoreByPartition<I, M>  {
+    M extends Writable, T> implements MessageStore<I, M>  {
   /** Message class */
   protected final Class<M> messageClass;
   /** Service worker */
@@ -150,16 +148,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   public boolean hasMessagesForVertex(I vertexId) {
     ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));
-    return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    List<I> vertices = Lists.newArrayList();
-    for (ConcurrentMap<I, ?> partitionMap : map.values()) {
-      vertices.addAll(partitionMap.keySet());
-    }
-    return vertices;
+    return partitionMap != null && partitionMap.containsKey(vertexId);
   }
 
   @Override
@@ -174,15 +163,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public int getNumberOfMessages() {
-    int numberOfMessages = 0;
-    for (ConcurrentMap<I, T> partitionMap : map.values()) {
-      numberOfMessages += getNumberOfMessagesIn(partitionMap);
-    }
-    return numberOfMessages;
-  }
-
-  @Override
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
@@ -197,15 +177,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(map.size());
-    for (int partitionId : map.keySet()) {
-      out.writeInt(partitionId);
-      writePartition(out, partitionId);
-    }
-  }
-
-  @Override
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
@@ -221,15 +192,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    int numPartitions = in.readInt();
-    for (int p = 0; p < numPartitions; p++) {
-      int partitionId = in.readInt();
-      readFieldsForPartition(in, partitionId);
-    }
-  }
-
-  @Override
   public void clearVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));