You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/06/26 01:22:33 UTC

[1/2] GIRAPH-697: Clean up message stores (majakabiljo)

Updated Branches:
  refs/heads/trunk f8a3c777e -> 89445670e


http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
new file mode 100644
index 0000000..1cda1d9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Message store which separates data by partitions,
+ * and submits them to underlying message store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MessageStore<I, M> {
+  /** Message class */
+  private final Class<M> messageClass;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E> service;
+  /** Number of messages to keep in memory */
+  private final int maxNumberOfMessagesInMemory;
+  /** Factory for creating file stores when flushing */
+  private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+  partitionStoreFactory;
+  /** Map from partition id to its message store */
+  private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>>
+  partitionMessageStores;
+
+  /**
+   * @param messageClass                Message class held in the store
+   * @param service                     Service worker
+   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
+   * @param partitionStoreFactory       Factory for creating stores for a
+   *                                    partition
+   */
+  public DiskBackedMessageStore(
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, V, E> service,
+      int maxNumberOfMessagesInMemory,
+      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
+          M>> partitionStoreFactory) {
+    this.messageClass = messageClass;
+    this.service = service;
+    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
+    this.partitionStoreFactory = partitionStoreFactory;
+    partitionMessageStores = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId,
+      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
+        getMessageStore(partitionId);
+    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+        vertexIdMessageIterator =
+        messages.getVertexIdMessageIterator();
+    while (vertexIdMessageIterator.hasNext()) {
+      vertexIdMessageIterator.next();
+      boolean ownsVertexId =
+          partitionMessageStore.addVertexMessages(
+              vertexIdMessageIterator.getCurrentVertexId(),
+              Collections.singleton(
+                  vertexIdMessageIterator.getCurrentMessage()));
+      if (ownsVertexId) {
+        vertexIdMessageIterator.releaseCurrentVertexId();
+      }
+    }
+    checkMemory();
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      return getMessageStore(vertexId).getVertexMessages(vertexId);
+    } else {
+      return EmptyIterable.get();
+    }
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore == null) {
+      return Collections.emptyList();
+    } else {
+      return messageStore.getDestinationVertices();
+    }
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    if (hasMessagesForVertex(vertexId)) {
+      getMessageStore(vertexId).clearVertexMessages(vertexId);
+    }
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      messageStore.clearAll();
+    }
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      messageStore.clearAll();
+    }
+    partitionMessageStores.clear();
+  }
+
+  /**
+   * Checks the memory status, flushes if necessary
+   *
+   * @throws IOException
+   */
+  private void checkMemory() throws IOException {
+    while (memoryFull()) {
+      flushOnePartition();
+    }
+  }
+
+  /**
+   * Check if memory is full
+   *
+   * @return True iff memory is full
+   */
+  private boolean memoryFull() {
+    int totalMessages = 0;
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      totalMessages += messageStore.getNumberOfMessages();
+    }
+    return totalMessages > maxNumberOfMessagesInMemory;
+  }
+
+  /**
+   * Finds biggest partition and flushes it to the disk
+   *
+   * @throws IOException
+   */
+  private void flushOnePartition() throws IOException {
+    int maxMessages = 0;
+    PartitionDiskBackedMessageStore<I, M> biggestStore = null;
+    for (PartitionDiskBackedMessageStore<I, M> messageStore :
+        partitionMessageStores.values()) {
+      int numMessages = messageStore.getNumberOfMessages();
+      if (numMessages > maxMessages) {
+        maxMessages = numMessages;
+        biggestStore = messageStore;
+      }
+    }
+    if (biggestStore != null) {
+      biggestStore.flush();
+    }
+  }
+
+  /**
+   * Get message store for partition which holds vertex with required vertex
+   * id
+   *
+   * @param vertexId Id of vertex for which we are asking for message store
+   * @return Requested message store
+   */
+  private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) {
+    int partitionId =
+        service.getVertexPartitionOwner(vertexId).getPartitionId();
+    return getMessageStore(partitionId);
+  }
+
+  /**
+   * Get message store for partition id. It it doesn't exist yet,
+   * creates a new one.
+   *
+   * @param partitionId Id of partition for which we are asking for message
+   *                    store
+   * @return Requested message store
+   */
+  private PartitionDiskBackedMessageStore<I, M> getMessageStore(
+      int partitionId) {
+    PartitionDiskBackedMessageStore<I, M> messageStore =
+        partitionMessageStores.get(partitionId);
+    if (messageStore != null) {
+      return messageStore;
+    }
+    messageStore = partitionStoreFactory.newStore(messageClass);
+    PartitionDiskBackedMessageStore<I, M> store =
+        partitionMessageStores.putIfAbsent(partitionId, messageStore);
+    return (store == null) ? messageStore : store;
+  }
+
+  @Override
+  public void writePartition(DataOutput out,
+      int partitionId) throws IOException {
+    PartitionDiskBackedMessageStore<I, M> partitionStore =
+        partitionMessageStores.get(partitionId);
+    out.writeBoolean(partitionStore != null);
+    if (partitionStore != null) {
+      partitionStore.write(out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException {
+    if (in.readBoolean()) {
+      PartitionDiskBackedMessageStore<I, M> messageStore =
+          partitionStoreFactory.newStore(messageClass);
+      messageStore.readFields(in);
+      partitionMessageStores.put(partitionId, messageStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service             Service worker
+   * @param maxMessagesInMemory Number of messages to keep in memory
+   * @param fileStoreFactory    Factory for creating file stores when
+   *                            flushing
+   * @param <I>                 Vertex id
+   * @param <V>                 Vertex data
+   * @param <E>                 Edge data
+   * @param <M>                 Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
+      CentralizedServiceWorker<I, V, E> service,
+      int maxMessagesInMemory,
+      MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+          fileStoreFactory) {
+    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+        fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link DiskBackedMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, V, E> service;
+    /** Number of messages to keep in memory */
+    private final int maxMessagesInMemory;
+    /** Factory for creating file stores when flushing */
+    private final
+    MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+    fileStoreFactory;
+
+    /**
+     * @param service             Service worker
+     * @param maxMessagesInMemory Number of messages to keep in memory
+     * @param fileStoreFactory    Factory for creating file stores when
+     *                            flushing
+     */
+    public Factory(CentralizedServiceWorker<I, V, E> service,
+        int maxMessagesInMemory,
+        MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+            fileStoreFactory) {
+      this.service = service;
+      this.maxMessagesInMemory = maxMessagesInMemory;
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
+      return new DiskBackedMessageStore<I, V, E, M>(messageClass,
+          service, maxMessagesInMemory, fileStoreFactory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
new file mode 100644
index 0000000..4ae805a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message storage with in-memory map of messages and with support for
+ * flushing all the messages to the disk. Holds messages for a single partition.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class PartitionDiskBackedMessageStore<I extends WritableComparable,
+    M extends Writable> implements Writable {
+  /** Message class */
+  private final Class<M> messageClass;
+  /**
+   * In-memory message map (must be sorted to insure that the ids are
+   * ordered)
+   */
+  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+  inMemoryMessages;
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Counter for number of messages in-memory */
+  private final AtomicInteger numberOfMessagesInMemory;
+  /** To keep vertex ids which we have messages for */
+  private final Set<I> destinationVertices;
+  /** File stores in which we keep flushed messages */
+  private final Collection<SequentialFileMessageStore<I, M>> fileStores;
+  /** Factory for creating file stores when flushing */
+  private final
+  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
+  /** Lock for disk flushing */
+  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
+  /**
+   * Constructor.
+   *
+   * @param messageClass     Message class held in the store
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating file stores when flushing
+   */
+  public PartitionDiskBackedMessageStore(
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+          fileStoreFactory) {
+    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+    this.messageClass = messageClass;
+    this.config = config;
+    numberOfMessagesInMemory = new AtomicInteger(0);
+    destinationVertices =
+        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
+    fileStores = Lists.newArrayList();
+    this.fileStoreFactory = fileStoreFactory;
+  }
+
+  /**
+   * Add vertex messages
+   *
+   * @param vertexId Vertex id to use
+   * @param messages Messages to add (note that the lifetime of the messages)
+   *                 is only until next() is called again)
+   * @return True if the vertex id ownership is taken by this method,
+   *         false otherwise
+   * @throws IOException
+   */
+  boolean addVertexMessages(I vertexId,
+                            Iterable<M> messages) throws IOException {
+    boolean ownsVertexId = false;
+    destinationVertices.add(vertexId);
+    rwLock.readLock().lock();
+    try {
+      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+      if (extendedDataOutput == null) {
+        ExtendedDataOutput newExtendedDataOutput =
+            config.createExtendedDataOutput();
+        extendedDataOutput =
+            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
+        if (extendedDataOutput == null) {
+          ownsVertexId = true;
+          extendedDataOutput = newExtendedDataOutput;
+        }
+      }
+
+      synchronized (extendedDataOutput) {
+        for (M message : messages) {
+          message.write(extendedDataOutput);
+          numberOfMessagesInMemory.getAndIncrement();
+        }
+      }
+    } finally {
+      rwLock.readLock().unlock();
+    }
+
+    return ownsVertexId;
+  }
+
+  /**
+   * Get the messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Iterable of messages for a vertex id
+   */
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+    if (extendedDataOutput == null) {
+      extendedDataOutput = config.createExtendedDataOutput();
+    }
+    Iterable<M> combinedIterable = new MessagesIterable<M>(
+        config, messageClass,
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      combinedIterable = Iterables.concat(combinedIterable,
+          fileStore.getVertexMessages(vertexId));
+    }
+    return combinedIterable;
+  }
+
+  /**
+   * Get number of messages in memory
+   *
+   * @return Number of messages in memory
+   */
+  public int getNumberOfMessages() {
+    return numberOfMessagesInMemory.get();
+  }
+
+  /**
+   * Check if we have messages for some vertex
+   *
+   * @param vertexId Id of vertex which we want to check
+   * @return True iff we have messages for vertex with required id
+   */
+  public boolean hasMessagesForVertex(I vertexId) {
+    return destinationVertices.contains(vertexId);
+  }
+
+  /**
+   * Gets vertex ids which we have messages for
+   *
+   * @return Iterable over vertex ids which we have messages for
+   */
+  public Iterable<I> getDestinationVertices() {
+    return destinationVertices;
+  }
+
+  /**
+   * Clears messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to clear messages
+   * @throws IOException
+   */
+  public void clearVertexMessages(I vertexId) throws IOException {
+    inMemoryMessages.remove(vertexId);
+  }
+
+  /**
+   * Clears all resources used by this store.
+   *
+   * @throws IOException
+   */
+  public void clearAll() throws IOException {
+    inMemoryMessages.clear();
+    destinationVertices.clear();
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      fileStore.clearAll();
+    }
+    fileStores.clear();
+  }
+
+  /**
+   * Flushes messages to the disk.
+   *
+   * @throws IOException
+   */
+  public void flush() throws IOException {
+    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
+    rwLock.writeLock().lock();
+    try {
+      messagesToFlush = inMemoryMessages;
+      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+      numberOfMessagesInMemory.set(0);
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+    SequentialFileMessageStore<I, M> fileStore =
+        fileStoreFactory.newStore(messageClass);
+    fileStore.addMessages(messagesToFlush);
+
+    synchronized (fileStores) {
+      fileStores.add(fileStore);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write destination vertices
+    out.writeInt(destinationVertices.size());
+    for (I vertexId : destinationVertices) {
+      vertexId.write(out);
+    }
+
+    // write of in-memory messages
+    out.writeInt(numberOfMessagesInMemory.get());
+
+    // write in-memory messages map
+    out.writeInt(inMemoryMessages.size());
+    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
+      entry.getKey().write(out);
+      out.writeInt(entry.getValue().getPos());
+      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
+    }
+
+    // write file stores
+    out.writeInt(fileStores.size());
+    for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+      fileStore.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read destination vertices
+    int numVertices = in.readInt();
+    for (int v = 0; v < numVertices; v++) {
+      I vertexId = (I) config.createVertexId();
+      vertexId.readFields(in);
+      destinationVertices.add(vertexId);
+    }
+
+    // read in-memory messages
+    numberOfMessagesInMemory.set(in.readInt());
+
+    // read in-memory map
+    int mapSize = in.readInt();
+    for (int m = 0; m < mapSize; m++) {
+      I vertexId = config.createVertexId();
+      vertexId.readFields(in);
+      int messageBytes = in.readInt();
+      byte[] buf = new byte[messageBytes];
+      ExtendedDataOutput extendedDataOutput =
+          config.createExtendedDataOutput(buf, messageBytes);
+      inMemoryMessages.put(vertexId, extendedDataOutput);
+    }
+
+    // read file stores
+    int numFileStores = in.readInt();
+    for (int s = 0; s < numFileStores; s++) {
+      SequentialFileMessageStore<I, M> fileStore =
+          fileStoreFactory.newStore(messageClass);
+      fileStore.readFields(in);
+      fileStores.add(fileStore);
+    }
+  }
+
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config           Hadoop configuration
+   * @param fileStoreFactory Factory for creating message stores for
+   *                         partitions
+   * @param <I>              Vertex id
+   * @param <M>              Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+          fileStoreFactory) {
+    return new Factory<I, M>(config, fileStoreFactory);
+  }
+
+  /**
+   * Factory for {@link PartitionDiskBackedMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable> implements MessageStoreFactory<I, M,
+      PartitionDiskBackedMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    /** Factory for creating message stores for partitions */
+    private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+    fileStoreFactory;
+
+    /**
+     * @param config           Hadoop configuration
+     * @param fileStoreFactory Factory for creating message stores for
+     *                         partitions
+     */
+    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+        MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+            fileStoreFactory) {
+      this.config = config;
+      this.fileStoreFactory = fileStoreFactory;
+    }
+
+    @Override
+    public PartitionDiskBackedMessageStore<I, M> newStore(
+        Class<M> messageClass) {
+      return new PartitionDiskBackedMessageStore<I, M>(messageClass, config,
+          fileStoreFactory);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
new file mode 100644
index 0000000..e5931c6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+
+/**
+ * Used for writing and reading collection of messages to the disk.
+ * {@link SequentialFileMessageStore#addMessages(NavigableMap)}
+ * should be called only once with the messages we want to store.
+ * <p/>
+ * It's optimized for retrieving messages in the natural order of vertex ids
+ * they are sent to.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class SequentialFileMessageStore<I extends WritableComparable,
+    M extends Writable> implements Writable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SequentialFileMessageStore.class);
+  /** Message class */
+  private final Class<M> messageClass;
+  /** File in which we store data */
+  private final File file;
+  /** Configuration which we need for reading data */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+  /** Buffer size to use when reading and writing files */
+  private final int bufferSize;
+  /** File input stream */
+  private DataInputStream in;
+  /** How many vertices do we have left to read in the file */
+  private int verticesLeft;
+  /** Id of currently read vertex */
+  private I currentVertexId;
+
+  /**
+   * Stores message on the disk.
+   *
+   * @param messageClass Message class held in the store
+   * @param config       Configuration used later for reading
+   * @param bufferSize   Buffer size to use when reading and writing
+   * @param fileName     File in which we want to store messages
+   * @throws IOException
+   */
+  public SequentialFileMessageStore(
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+      int bufferSize,
+      String fileName) {
+    this.messageClass = messageClass;
+    this.config = config;
+    this.bufferSize = bufferSize;
+    file = new File(fileName);
+  }
+
+  /**
+   * Adds messages from one message store to another
+   *
+   * @param messageMap Add the messages from this map to this store
+   * @throws java.io.IOException
+   */
+  public void addMessages(NavigableMap<I, ExtendedDataOutput> messageMap)
+    throws IOException {
+    // Writes messages to its file
+    if (file.exists()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addMessages: Deleting " + file);
+      }
+      file.delete();
+    }
+    file.createNewFile();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addMessages: Creating " + file);
+    }
+
+    DataOutputStream out = null;
+
+    try {
+      out = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
+      int destinationVertexIdCount = messageMap.size();
+      out.writeInt(destinationVertexIdCount);
+
+      // Dump the vertices and their messages in a sorted order
+      for (Map.Entry<I, ExtendedDataOutput> entry : messageMap.entrySet()) {
+        I destinationVertexId = entry.getKey();
+        destinationVertexId.write(out);
+        ExtendedDataOutput extendedDataOutput = entry.getValue();
+        Iterable<M> messages = new MessagesIterable<M>(
+            config, messageClass, extendedDataOutput.getByteArray(), 0,
+            extendedDataOutput.getPos());
+        int messageCount = Iterables.size(messages);
+        out.writeInt(messageCount);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("addMessages: For vertex id " + destinationVertexId +
+              ", messages = " + messageCount + " to file " + file);
+        }
+        for (M message : messages) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("addMessages: Wrote " + message + " to " + file);
+          }
+          message.write(out);
+        }
+      }
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  /**
+   * Reads messages for a vertex. It will find the messages only if all
+   * previous reads used smaller vertex ids than this one - messages should
+   * be retrieved in increasing order of vertex ids.
+   *
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Messages for the selected vertex, or empty list if not used
+   *         correctly
+   * @throws IOException
+   */
+  public Iterable<M> getVertexMessages(I vertexId) throws
+      IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
+          " (currently " + currentVertexId + ") from " + file);
+    }
+    if (in == null) {
+      startReading();
+    }
+
+    I nextVertexId = getCurrentVertexId();
+    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
+      nextVertexId = getNextVertexId();
+    }
+
+    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
+      return EmptyIterable.get();
+    }
+
+    return readMessagesForCurrentVertex();
+  }
+
+  /**
+   * Clears all resources used by this store.
+   */
+  public void clearAll() throws IOException {
+    endReading();
+    file.delete();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(file.length());
+    FileInputStream input = new FileInputStream(file);
+    try {
+      byte[] buffer = new byte[bufferSize];
+      while (true) {
+        int length = input.read(buffer);
+        if (length < 0) {
+          break;
+        }
+        out.write(buffer, 0, length);
+      }
+    } finally {
+      input.close();
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    FileOutputStream output = new FileOutputStream(file);
+    try {
+      long fileLength = in.readLong();
+      byte[] buffer = new byte[bufferSize];
+      for (long position = 0; position < fileLength; position += bufferSize) {
+        int bytes = (int) Math.min(bufferSize, fileLength - position);
+        in.readFully(buffer, 0, bytes);
+        output.write(buffer);
+      }
+    } finally {
+      output.close();
+    }
+  }
+
+  /**
+   * Prepare for reading
+   *
+   * @throws IOException
+   */
+  private void startReading() throws IOException {
+    currentVertexId = null;
+    in = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file), bufferSize));
+    verticesLeft = in.readInt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("startReading: File " + file + " with " +
+          verticesLeft + " vertices left");
+    }
+  }
+
+  /**
+   * Gets current vertex id.
+   * <p/>
+   * If there is a vertex id whose messages haven't been read yet it
+   * will return that vertex id, otherwise it will read and return the next
+   * one.
+   *
+   * @return Current vertex id
+   * @throws IOException
+   */
+  private I getCurrentVertexId() throws IOException {
+    if (currentVertexId != null) {
+      return currentVertexId;
+    } else {
+      return getNextVertexId();
+    }
+  }
+
+  /**
+   * Gets next vertex id.
+   * <p/>
+   * If there is a vertex whose messages haven't been read yet it
+   * will read and skip over its messages to get to the next vertex.
+   *
+   * @return Next vertex id
+   * @throws IOException
+   */
+  private I getNextVertexId() throws IOException {
+    if (currentVertexId != null) {
+      readMessagesForCurrentVertex();
+    }
+    if (verticesLeft == 0) {
+      return null;
+    }
+    currentVertexId = config.createVertexId();
+    currentVertexId.readFields(in);
+    return currentVertexId;
+  }
+
+  /**
+   * Reads messages for current vertex.
+   *
+   * @return Messages for current vertex
+   * @throws IOException
+   */
+  private Collection<M> readMessagesForCurrentVertex() throws IOException {
+    int messagesSize = in.readInt();
+    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
+    for (int i = 0; i < messagesSize; i++) {
+      M message = ReflectionUtils.newInstance(messageClass);
+      try {
+        message.readFields(in);
+      } catch (IOException e) {
+        throw new IllegalStateException("readMessagesForCurrentVertex: " +
+            "Failed to read message from " + i + " of " +
+            messagesSize + " for vertex id " + currentVertexId + " from " +
+            file, e);
+      }
+      messages.add(message);
+    }
+    currentVertexDone();
+    return messages;
+  }
+
+  /**
+   * Release current vertex.
+   *
+   * @throws IOException
+   */
+  private void currentVertexDone() throws IOException {
+    currentVertexId = null;
+    verticesLeft--;
+    if (verticesLeft == 0) {
+      endReading();
+    }
+  }
+
+  /**
+   * Call when we are done reading, for closing files.
+   *
+   * @throws IOException
+   */
+  private void endReading() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("endReading: Stopped reading " + file);
+    }
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+  }
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param config Hadoop configuration
+   * @param <I>    Vertex id
+   * @param <M>    Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    return new Factory<I, M>(config);
+  }
+
+  /**
+   * Factory for {@link SequentialFileMessageStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable>
+      implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    /** Directories in which we'll keep necessary files */
+    private final String[] directories;
+    /** Buffer size to use when reading and writing */
+    private final int bufferSize;
+    /** Counter for created message stores */
+    private final AtomicInteger storeCounter;
+
+    /**
+     * Constructor.
+     *
+     * @param config Hadoop configuration
+     */
+    public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+      this.config = config;
+      String jobId = config.get("mapred.job.id", "Unknown Job");
+      int taskId   = config.getTaskPartition();
+      List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
+      Collections.shuffle(userPaths);
+      directories = new String[userPaths.size()];
+      int i = 0;
+      for (String path : userPaths) {
+        String directory = path + File.separator + jobId + File.separator +
+            taskId + File.separator;
+        directories[i++] = directory;
+        new File(directory).mkdirs();
+      }
+      this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
+      storeCounter = new AtomicInteger();
+    }
+
+    @Override
+    public SequentialFileMessageStore<I, M> newStore(Class<M> messageClass) {
+      int idx = Math.abs(storeCounter.getAndIncrement());
+      String fileName =
+          directories[idx % directories.length] + "messages-" + idx;
+      return new SequentialFileMessageStore<I, M>(messageClass, config,
+          bufferSize, fileName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
new file mode 100644
index 0000000..7039378
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of out-of-core messages related classes.
+ */
+package org.apache.giraph.comm.messages.out_of_core;

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
index 3c798a9..8721756 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of communication related objects, IPC service.
+ * Package of classes for storing messages.
  */
 package org.apache.giraph.comm.messages;

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 2adf19d..d786db5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -26,7 +26,7 @@ import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
@@ -215,7 +215,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   private void sendPartitionMessages(WorkerInfo workerInfo,
                                      Partition<I, V, E> partition) {
     final int partitionId = partition.getId();
-    MessageStoreByPartition<I, Writable> messageStore =
+    MessageStore<I, Writable> messageStore =
         serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index b457038..3473de1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,14 +21,12 @@ package org.apache.giraph.comm.netty;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
@@ -107,7 +105,7 @@ public class NettyWorkerServer<I extends WritableComparable,
    *
    * @return Message store factory
    */
-  private MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   createMessageStoreFactory() {
     boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
     if (!useOutOfCoreMessaging) {
@@ -118,12 +116,13 @@ public class NettyWorkerServer<I extends WritableComparable,
         LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
             "maxMessagesInMemory = " + maxMessagesInMemory);
       }
-      MessageStoreFactory<I, Writable, BasicMessageStore<I, Writable>>
+      MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
           fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
-      MessageStoreFactory<I, Writable, FlushableMessageStore<I, Writable>>
+      MessageStoreFactory<I, Writable,
+          PartitionDiskBackedMessageStore<I, Writable>>
           partitionStoreFactory =
-          DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
-      return DiskBackedMessageStoreByPartition.newFactory(service,
+          PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory);
+      return DiskBackedMessageStore.newFactory(service,
           maxMessagesInMemory, partitionStoreFactory);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 6fdcfb0..a9bf3fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -19,7 +19,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.SimpleVertexWriter;
@@ -81,7 +81,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   /** Thread-safe queue of all partition ids */
   private final BlockingQueue<Integer> partitionIdQueue;
   /** Message store */
-  private final MessageStoreByPartition<I, M1> messageStore;
+  private final MessageStore<I, M1> messageStore;
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Worker (for NettyWorkerClientRequestProcessor) */
@@ -111,7 +111,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
    */
   public ComputeCallable(
       Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
-      MessageStoreByPartition<I, M1> messageStore,
+      MessageStore<I, M1> messageStore,
       BlockingQueue<Integer> partitionIdQueue,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       CentralizedServiceWorker<I, V, E> serviceWorker) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 435dd87..b32c21b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -21,7 +21,7 @@ package org.apache.giraph.graph;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.BspServiceMaster;
@@ -253,7 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       graphState = checkSuperstepRestarted(superstep, graphState);
       prepareForSuperstep(graphState);
       context.progress();
-      MessageStoreByPartition<I, Writable> messageStore =
+      MessageStore<I, Writable> messageStore =
         serviceWorker.getServerData().getCurrentMessageStore();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
       int numThreads = Math.min(numComputeThreads, numPartitions);
@@ -691,7 +691,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
       List<PartitionStats> partitionStatsList,
       final GraphState graphState,
-      final MessageStoreByPartition<I, Writable> messageStore,
+      final MessageStore<I, Writable> messageStore,
       int numPartitions,
       int numThreads) {
     final BlockingQueue<Integer> computePartitionIdQueue =

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 5c69161..35e6362 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -102,7 +102,7 @@ public class RequestFailureTest {
   private void checkResult(int numRequests) throws IOException {
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getIncomingMessageStore().getDestinationVertices();
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 7016572..c8c09df 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -168,7 +168,7 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getIncomingMessageStore().getDestinationVertices();
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index 4e8041a..e270816 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -24,14 +24,12 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.BasicMessageStore;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -138,42 +136,25 @@ public class TestMessageStores {
     return allMessages;
   }
 
-  /**
-   * Used for testing only
-   */
-  private static class InputMessageStore extends
-      ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> {
-
-    /**
-     * Constructor
-     *
-     * @param service Service worker
-     * @param config  Hadoop configuration
-     */
-    InputMessageStore(
-        CentralizedServiceWorker<IntWritable, ?, ?> service,
-        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
-        Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
-      super(IntWritable.class, service, config);
-      // Adds all the messages to the store
-      for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
-          inputMap.entrySet()) {
-        int partitionId = getPartitionId(entry.getKey());
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>
-            byteArrayVertexIdMessages =
-            new ByteArrayVertexIdMessages<IntWritable,
-                IntWritable>(IntWritable.class);
-        byteArrayVertexIdMessages.setConf(config);
-        byteArrayVertexIdMessages.initialize();
-        for (IntWritable message : entry.getValue()) {
-          byteArrayVertexIdMessages.add(entry.getKey(), message);
-        }
-        try {
-          addPartitionMessages(partitionId, byteArrayVertexIdMessages);
-        } catch (IOException e) {
-          throw new IllegalStateException("Got IOException", e);
-        }
+  private static void addMessages(
+      MessageStore<IntWritable, IntWritable> messageStore,
+      CentralizedServiceWorker<IntWritable, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
+      Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
+    for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
+        inputMap.entrySet()) {
+      int partitionId =
+          service.getVertexPartitionOwner(entry.getKey()).getPartitionId();
+      ByteArrayVertexIdMessages<IntWritable, IntWritable>
+          byteArrayVertexIdMessages =
+          new ByteArrayVertexIdMessages<IntWritable,
+              IntWritable>(IntWritable.class);
+      byteArrayVertexIdMessages.setConf(config);
+      byteArrayVertexIdMessages.initialize();
+      for (IntWritable message : entry.getValue()) {
+        byteArrayVertexIdMessages.add(entry.getKey(), message);
       }
+      messageStore.addPartitionMessages(partitionId, byteArrayVertexIdMessages);
     }
   }
 
@@ -184,8 +165,7 @@ public class TestMessageStores {
     for (int n = 0; n < testData.numTimes; n++) {
       SortedMap<IntWritable, Collection<IntWritable>> batch =
           createRandomMessages(testData);
-      messageStore.addMessages(new InputMessageStore(service, config,
-          batch));
+      addMessages(messageStore, service, config, batch);
       for (Entry<IntWritable, Collection<IntWritable>> entry :
           batch.entrySet()) {
         if (messages.containsKey(entry.getKey())) {
@@ -200,19 +180,24 @@ public class TestMessageStores {
   private <I extends WritableComparable, M extends Writable> boolean
   equalMessages(
       MessageStore<I, M> messageStore,
-      Map<I, Collection<M>> expectedMessages) throws IOException {
-    TreeSet<I> vertexIds = Sets.newTreeSet();
-    Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
-    for (I vertexId : vertexIds) {
-      Iterable<M> expected = expectedMessages.get(vertexId);
-      if (expected == null) {
-        return false;
-      }
-      Iterable<M> actual = messageStore.getVertexMessages(vertexId);
-      if (!CollectionUtils.isEqual(expected, actual)) {
-        System.err.println("equalMessages: For vertexId " + vertexId +
-            " expected " + expected + ", but got " + actual);
-        return false;
+      Map<I, Collection<M>> expectedMessages,
+      TestData testData) throws IOException {
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      TreeSet<I> vertexIds = Sets.newTreeSet();
+      Iterables.addAll(vertexIds,
+          messageStore.getPartitionDestinationVertices(partitionId));
+      for (I vertexId : vertexIds) {
+        Iterable<M> expected = expectedMessages.get(vertexId);
+        if (expected == null) {
+          return false;
+        }
+        Iterable<M> actual = messageStore.getVertexMessages(vertexId);
+        if (!CollectionUtils.isEqual(expected, actual)) {
+          System.err.println("equalMessages: For vertexId " + vertexId +
+              " expected " + expected + ", but got " + actual);
+          return false;
+        }
       }
     }
     return true;
@@ -220,7 +205,7 @@ public class TestMessageStores {
 
   private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
       MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
-      S messageStore) throws IOException {
+      S messageStore, TestData testData) throws IOException {
     File file = new File(directory, "messageStoreTest");
     if (file.exists()) {
       file.delete();
@@ -228,14 +213,20 @@ public class TestMessageStores {
     file.createNewFile();
     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         (new FileOutputStream(file))));
-    messageStore.write(out);
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      messageStore.writePartition(out, partitionId);
+    }
     out.close();
 
     messageStore = messageStoreFactory.newStore(IntWritable.class);
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(
         (new FileInputStream(file))));
-    messageStore.readFields(in);
+    for (int partitionId = 0; partitionId < testData.numOfPartitions;
+         partitionId++) {
+      messageStore.readFieldsForPartition(in, partitionId);
+    }
     in.close();
     file.delete();
 
@@ -250,10 +241,10 @@ public class TestMessageStores {
         new TreeMap<IntWritable, Collection<IntWritable>>();
     S messageStore = messageStoreFactory.newStore(IntWritable.class);
     putNTimes(messageStore, messages, testData);
-    assertTrue(equalMessages(messageStore, messages));
+    assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();
-    messageStore = doCheckpoint(messageStoreFactory, messageStore);
-    assertTrue(equalMessages(messageStore, messages));
+    messageStore = doCheckpoint(messageStoreFactory, messageStore, testData);
+    assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();
   }
 
@@ -270,31 +261,18 @@ public class TestMessageStores {
   }
 
   @Test
-  public void testDiskBackedMessageStore() {
-    try {
-      MessageStoreFactory<IntWritable, IntWritable,
-          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
-          SequentialFileMessageStore.newFactory(config);
-      MessageStoreFactory<IntWritable, IntWritable,
-          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
-          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
-      testMessageStore(diskStoreFactory, testData);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
   public void testDiskBackedMessageStoreByPartition() {
     try {
       MessageStoreFactory<IntWritable, IntWritable,
-          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore<IntWritable, IntWritable>>
+          fileStoreFactory =
           SequentialFileMessageStore.newFactory(config);
       MessageStoreFactory<IntWritable, IntWritable,
-          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
-          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
-      testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service,
-          testData.maxMessagesInMemory, diskStoreFactory), testData);
+          PartitionDiskBackedMessageStore<IntWritable, IntWritable>>
+          partitionStoreFactory =
+          PartitionDiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(DiskBackedMessageStore.newFactory(service,
+          testData.maxMessagesInMemory, partitionStoreFactory), testData);
     } catch (IOException e) {
       e.printStackTrace();
     }


[2/2] git commit: updated refs/heads/trunk to 8944567

Posted by ma...@apache.org.
GIRAPH-697: Clean up message stores (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/89445670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/89445670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/89445670

Branch: refs/heads/trunk
Commit: 89445670eee21e38c0dadde4753310adae14b964
Parents: f8a3c77
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Tue Jun 25 16:05:44 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Tue Jun 25 16:05:44 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/comm/ServerData.java |  23 +-
 .../giraph/comm/messages/BasicMessageStore.java |  67 ---
 .../ByteArrayMessagesPerVertexStore.java        |  77 +---
 .../comm/messages/DiskBackedMessageStore.java   | 418 -------------------
 .../DiskBackedMessageStoreByPartition.java      | 390 -----------------
 .../comm/messages/FlushableMessageStore.java    |  40 --
 .../messages/InMemoryMessageStoreFactory.java   |   4 +-
 .../giraph/comm/messages/MessageStore.java      |  75 +++-
 .../comm/messages/MessageStoreByPartition.java  |  82 ----
 .../comm/messages/MessageStoreFactory.java      |   6 +-
 .../giraph/comm/messages/MessagesIterable.java  |  56 +++
 .../comm/messages/OneMessagePerVertexStore.java |  65 +--
 .../messages/SequentialFileMessageStore.java    | 411 ------------------
 .../comm/messages/SimpleMessageStore.java       |  44 +-
 .../out_of_core/DiskBackedMessageStore.java     | 322 ++++++++++++++
 .../PartitionDiskBackedMessageStore.java        | 350 ++++++++++++++++
 .../out_of_core/SequentialFileMessageStore.java | 407 ++++++++++++++++++
 .../comm/messages/out_of_core/package-info.java |  21 +
 .../giraph/comm/messages/package-info.java      |   2 +-
 .../NettyWorkerClientRequestProcessor.java      |   4 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |  21 +-
 .../apache/giraph/graph/ComputeCallable.java    |   6 +-
 .../apache/giraph/graph/GraphTaskManager.java   |   6 +-
 .../apache/giraph/comm/RequestFailureTest.java  |   2 +-
 .../org/apache/giraph/comm/RequestTest.java     |   2 +-
 .../apache/giraph/comm/TestMessageStores.java   | 140 +++----
 27 files changed, 1335 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0f2758e..2a605d8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-697: Clean up message stores (majakabiljo)
+
   GIRAPH-696: Should be able to spill giraph metrics to a specified 
   directory on HDFS (claudio)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 788be53..affc260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -54,21 +54,18 @@ public class ServerData<I extends WritableComparable,
   /** Edge store for this worker. */
   private final EdgeStore<I, V, E> edgeStore;
   /** Message store factory */
-  private final
-  MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+  private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   messageStoreFactory;
   /**
    * Message store for incoming messages (messages which will be consumed
    * in the next super step)
    */
-  private volatile MessageStoreByPartition<I, Writable>
-  incomingMessageStore;
+  private volatile MessageStore<I, Writable> incomingMessageStore;
   /**
    * Message store for current messages (messages which we received in
    * previous super step and which will be consumed in current super step)
    */
-  private volatile MessageStoreByPartition<I, Writable>
-  currentMessageStore;
+  private volatile MessageStore<I, Writable> currentMessageStore;
   /**
    * Map of partition ids to incoming vertex mutations from other workers.
    * (Synchronized access to values)
@@ -95,7 +92,7 @@ public class ServerData<I extends WritableComparable,
   public ServerData(
       CentralizedServiceWorker<I, V, E> service,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+      MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
           messageStoreFactory,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
@@ -136,9 +133,8 @@ public class ServerData<I extends WritableComparable,
    * @param <M> Message data
    * @return Incoming message store
    */
-  public <M extends Writable> MessageStoreByPartition<I, M>
-  getIncomingMessageStore() {
-    return (MessageStoreByPartition<I, M>) incomingMessageStore;
+  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+    return (MessageStore<I, M>) incomingMessageStore;
   }
 
   /**
@@ -148,9 +144,8 @@ public class ServerData<I extends WritableComparable,
    * @param <M> Message data
    * @return Current message store
    */
-  public <M extends Writable> MessageStoreByPartition<I, M>
-  getCurrentMessageStore() {
-    return (MessageStoreByPartition<I, M>) currentMessageStore;
+  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+    return (MessageStore<I, M>) currentMessageStore;
   }
 
   /** Prepare for next super step */

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
deleted file mode 100644
index dcb6223..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Most basic message store with just add, get and clear operations
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface BasicMessageStore<I extends WritableComparable,
-    M extends Writable> extends Writable {
-  /**
-   * Adds messages from one message store to another
-   *
-   * @param messageStore Add the messages from this message store to this
-   *                     object
-   * @throws java.io.IOException
-   */
-  void addMessages(MessageStore<I, M> messageStore) throws IOException;
-
-  /**
-   * Gets messages for a vertex.  The lifetime of every message is only
-   * guaranteed until the iterator's next() method is called.  Do not re-use
-   * the messages.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Iterable of messages for a vertex id
-   * @throws IOException
-   */
-  Iterable<M> getVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears messages for a vertex.
-   *
-   * @param vertexId Vertex id for which we want to clear messages
-   * @throws IOException
-   */
-  void clearVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears all resources used by this store.
-   *
-   * @throws IOException
-   */
-  void clearAll() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 97c8a35..fecd7ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -24,7 +24,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
 import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.hadoop.io.Writable;
@@ -33,7 +32,6 @@ import org.apache.hadoop.io.WritableComparable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -128,33 +126,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     }
   }
 
-  /**
-   * Special iterable that recycles the message
-   */
-  private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
-    /**
-     * Constructor
-     *
-     * @param buf Buffer
-     * @param off Offset to start in the buffer
-     * @param length Length of the buffer
-     */
-    private MessagesIterable(byte[] buf, int off, int length) {
-      super(config, buf, off, length);
-    }
-
-    @Override
-    protected M createWritable() {
-      return ReflectionUtils.newInstance(messageClass);
-    }
-  }
-
   @Override
   protected Iterable<M> getMessagesAsIterable(
       ExtendedDataOutput extendedDataOutput) {
-
-    return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
-        extendedDataOutput.getPos());
+    return new MessagesIterable<M>(config, messageClass,
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
   }
 
   /**
@@ -209,9 +185,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     int byteArraySize = in.readInt();
     byte[] messages = new byte[byteArraySize];
     in.readFully(messages);
-    ExtendedDataOutput extendedDataOutput =
-        config.createExtendedDataOutput(messages, 0);
-    return extendedDataOutput;
+    return config.createExtendedDataOutput(messages, 0);
   }
 
   /**
@@ -224,48 +198,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @return Factory
    */
   public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
       CentralizedServiceWorker<I, ?, ?> service,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
   }
 
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
-      ByteArrayMessagesPerVertexStore<I, M>
-          byteArrayMessagesPerVertexStore =
-          (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
-      for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
-           partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
-        for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
-            partitionEntry.getValue().entrySet()) {
-          ConcurrentMap<I, ExtendedDataOutput> partitionMap =
-              getOrCreatePartitionMap(partitionEntry.getKey());
-          ExtendedDataOutput extendedDataOutput =
-              partitionMap.get(vertexEntry.getKey());
-          if (extendedDataOutput == null) {
-            ExtendedDataOutput newExtendedDataOutput =
-                config.createExtendedDataOutput();
-            extendedDataOutput =
-                partitionMap.putIfAbsent(vertexEntry.getKey(),
-                    newExtendedDataOutput);
-            if (extendedDataOutput == null) {
-              extendedDataOutput = newExtendedDataOutput;
-            }
-          }
-
-          // Add the messages
-          extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
-              vertexEntry.getValue().getPos());
-        }
-      }
-    } else {
-      throw new IllegalArgumentException("addMessages: Illegal argument " +
-          messageStore.getClass());
-    }
-  }
-
   /**
    * Factory for {@link ByteArrayMessagesPerVertexStore}
    *
@@ -273,7 +211,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @param <M> Message data
    */
   private static class Factory<I extends WritableComparable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
@@ -290,8 +228,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
-      return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
+      return new ByteArrayMessagesPerVertexStore<I, M>(
+          messageClass, service, config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
deleted file mode 100644
index 2712edd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
-    M extends Writable> implements FlushableMessageStore<I, M> {
-  /** Message class */
-  private final Class<M> messageClass;
-  /**
-   * In-memory message map (must be sorted to insure that the ids are
-   * ordered)
-   */
-  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
-  inMemoryMessages;
-  /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Counter for number of messages in-memory */
-  private final AtomicInteger numberOfMessagesInMemory;
-  /** To keep vertex ids which we have messages for */
-  private final Set<I> destinationVertices;
-  /** File stores in which we keep flushed messages */
-  private final Collection<BasicMessageStore<I, M>> fileStores;
-  /** Factory for creating file stores when flushing */
-  private final
-  MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-  /** Lock for disk flushing */
-  private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
-  /**
-   * Constructor.
-   *
-   * @param messageClass     Message class held in the store
-   * @param config           Hadoop configuration
-   * @param fileStoreFactory Factory for creating file stores when flushing
-   */
-  public DiskBackedMessageStore(
-      Class<M> messageClass,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
-    this.messageClass = messageClass;
-    this.config = config;
-    numberOfMessagesInMemory = new AtomicInteger(0);
-    destinationVertices =
-        Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
-    fileStores = Lists.newArrayList();
-    this.fileStoreFactory = fileStoreFactory;
-  }
-
-  /**
-   * Add vertex messages
-   *
-   * @param vertexId Vertex id to use
-   * @param messages Messages to add (note that the lifetime of the messages)
-   *                 is only until next() is called again)
-   * @return True if the vertex id ownership is taken by this method,
-   *         false otherwise
-   * @throws IOException
-   */
-  boolean addVertexMessages(I vertexId,
-                            Iterable<M> messages) throws IOException {
-    boolean ownsVertexId = false;
-    destinationVertices.add(vertexId);
-    rwLock.readLock().lock();
-    try {
-      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-      if (extendedDataOutput == null) {
-        ExtendedDataOutput newExtendedDataOutput =
-            config.createExtendedDataOutput();
-        extendedDataOutput =
-            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
-        if (extendedDataOutput == null) {
-          ownsVertexId = true;
-          extendedDataOutput = newExtendedDataOutput;
-        }
-      }
-
-      synchronized (extendedDataOutput) {
-        for (M message : messages) {
-          message.write(extendedDataOutput);
-          numberOfMessagesInMemory.getAndIncrement();
-        }
-      }
-    } finally {
-      rwLock.readLock().unlock();
-    }
-
-    return ownsVertexId;
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws
-      IOException {
-    for (I destinationVertex : messageStore.getDestinationVertices()) {
-      addVertexMessages(destinationVertex,
-          messageStore.getVertexMessages(destinationVertex));
-    }
-  }
-
-  /**
-   * Special iterable that recycles the message
-   */
-  private class MessageIterable extends RepresentativeByteArrayIterable<M> {
-    /**
-     * Constructor
-     *
-     * @param buf Buffer
-     * @param off Offset to start in the buffer
-     * @param length Length of the buffer
-     */
-    public MessageIterable(
-        byte[] buf, int off, int length) {
-      super(config, buf, off, length);
-    }
-
-    @Override
-    protected M createWritable() {
-      return ReflectionUtils.newInstance(messageClass);
-    }
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-    if (extendedDataOutput == null) {
-      extendedDataOutput = config.createExtendedDataOutput();
-    }
-    Iterable<M> combinedIterable = new MessageIterable(
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      combinedIterable = Iterables.concat(combinedIterable,
-          fileStore.getVertexMessages(vertexId));
-    }
-    return combinedIterable;
-  }
-
-  @Override
-  public int getNumberOfMessages() {
-    return numberOfMessagesInMemory.get();
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    return destinationVertices.contains(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    return destinationVertices;
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    inMemoryMessages.remove(vertexId);
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    inMemoryMessages.clear();
-    destinationVertices.clear();
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      fileStore.clearAll();
-    }
-    fileStores.clear();
-  }
-
-  /**
-   * Special temporary message store for passing along in-memory messages
-   */
-  private class TemporaryMessageStore implements MessageStore<I, M> {
-    /**
-     * In-memory message map (must be sorted to insure that the ids are
-     * ordered)
-     */
-    private final ConcurrentNavigableMap<I, ExtendedDataOutput>
-    temporaryMessages;
-
-    /**
-     * Constructor.
-     *
-     * @param temporaryMessages Messages to be owned by this object
-     */
-    private TemporaryMessageStore(
-        ConcurrentNavigableMap<I, ExtendedDataOutput>
-            temporaryMessages) {
-      this.temporaryMessages = temporaryMessages;
-    }
-
-    @Override
-    public int getNumberOfMessages() {
-      throw new IllegalAccessError("getNumberOfMessages: Not supported");
-    }
-
-    @Override
-    public boolean hasMessagesForVertex(I vertexId) {
-      return temporaryMessages.containsKey(vertexId);
-    }
-
-    @Override
-    public Iterable<I> getDestinationVertices() {
-      return temporaryMessages.keySet();
-    }
-
-    @Override
-    public void addMessages(MessageStore<I, M> messageStore)
-      throws IOException {
-      throw new IllegalAccessError("addMessages: Not supported");
-    }
-
-    @Override
-    public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-      ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
-      if (extendedDataOutput == null) {
-        extendedDataOutput = config.createExtendedDataOutput();
-      }
-      return new MessageIterable(extendedDataOutput.getByteArray(), 0,
-          extendedDataOutput.getPos());
-    }
-
-    @Override
-    public void clearVertexMessages(I vertexId) throws IOException {
-      temporaryMessages.remove(vertexId);
-    }
-
-    @Override
-    public void clearAll() throws IOException {
-      temporaryMessages.clear();
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-      throw new IllegalAccessError("write: Not supported");
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-      throw new IllegalAccessError("readFields: Not supported");
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
-    rwLock.writeLock().lock();
-    try {
-      messagesToFlush = inMemoryMessages;
-      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
-      numberOfMessagesInMemory.set(0);
-    } finally {
-      rwLock.writeLock().unlock();
-    }
-    BasicMessageStore<I, M> fileStore =
-        fileStoreFactory.newStore(messageClass);
-    fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
-
-    synchronized (fileStores) {
-      fileStores.add(fileStore);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // write destination vertices
-    out.writeInt(destinationVertices.size());
-    for (I vertexId : destinationVertices) {
-      vertexId.write(out);
-    }
-
-    // write of in-memory messages
-    out.writeInt(numberOfMessagesInMemory.get());
-
-    // write in-memory messages map
-    out.writeInt(inMemoryMessages.size());
-    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
-      entry.getKey().write(out);
-      out.writeInt(entry.getValue().getPos());
-      out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
-    }
-
-    // write file stores
-    out.writeInt(fileStores.size());
-    for (BasicMessageStore<I, M> fileStore : fileStores) {
-      fileStore.write(out);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    // read destination vertices
-    int numVertices = in.readInt();
-    for (int v = 0; v < numVertices; v++) {
-      I vertexId = (I) config.createVertexId();
-      vertexId.readFields(in);
-      destinationVertices.add(vertexId);
-    }
-
-    // read in-memory messages
-    numberOfMessagesInMemory.set(in.readInt());
-
-    // read in-memory map
-    int mapSize = in.readInt();
-    for (int m = 0; m < mapSize; m++) {
-      I vertexId = config.createVertexId();
-      vertexId.readFields(in);
-      int messageBytes = in.readInt();
-      byte[] buf = new byte[messageBytes];
-      ExtendedDataOutput extendedDataOutput =
-          config.createExtendedDataOutput(buf, messageBytes);
-      inMemoryMessages.put(vertexId, extendedDataOutput);
-    }
-
-    // read file stores
-    int numFileStores = in.readInt();
-    for (int s = 0; s < numFileStores; s++) {
-      BasicMessageStore<I, M> fileStore =
-          fileStoreFactory.newStore(messageClass);
-      fileStore.readFields(in);
-      fileStores.add(fileStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config           Hadoop configuration
-   * @param fileStoreFactory Factory for creating message stores for
-   *                         partitions
-   * @param <I>              Vertex id
-   * @param <M>              Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-    return new Factory<I, M>(config, fileStoreFactory);
-  }
-
-  /**
-   * Factory for {@link DiskBackedMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable> implements MessageStoreFactory<I, M,
-      FlushableMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration config;
-    /** Factory for creating message stores for partitions */
-    private final
-    MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-
-    /**
-     * @param config           Hadoop configuration
-     * @param fileStoreFactory Factory for creating message stores for
-     *                         partitions
-     */
-    public Factory(ImmutableClassesGiraphConfiguration config,
-        MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
-      this.config = config;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
-      return new DiskBackedMessageStore<I, M>(messageClass, config,
-          fileStoreFactory);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
deleted file mode 100644
index 4a28949..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MessageStoreByPartition<I, M> {
-  /** Message class */
-  private final Class<M> messageClass;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, V, E> service;
-  /** Number of messages to keep in memory */
-  private final int maxNumberOfMessagesInMemory;
-  /** Factory for creating file stores when flushing */
-  private final
-  MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-  /** Map from partition id to its message store */
-  private final
-  ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
-  /**
-   * @param messageClass                Message class held in the store
-   * @param service                     Service worker
-   * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory            Factory for creating file stores
-   *                                    when flushing
-   */
-  public DiskBackedMessageStoreByPartition(
-      Class<M> messageClass,
-      CentralizedServiceWorker<I, V, E> service,
-      int maxNumberOfMessagesInMemory,
-      MessageStoreFactory<I, M, FlushableMessageStore<I,
-          M>> fileStoreFactory) {
-    this.messageClass = messageClass;
-    this.service = service;
-    this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
-    this.fileStoreFactory = fileStoreFactory;
-    partitionMessageStores = Maps.newConcurrentMap();
-  }
-
-  @Override
-  public void addPartitionMessages(
-      int partitionId,
-      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
-    FlushableMessageStore<I, M> flushableMessageStore =
-        getMessageStore(partitionId);
-    if (flushableMessageStore instanceof DiskBackedMessageStore) {
-      DiskBackedMessageStore<I, M> diskBackedMessageStore =
-          (DiskBackedMessageStore<I, M>) flushableMessageStore;
-      ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
-          vertexIdMessageIterator =
-          messages.getVertexIdMessageIterator();
-      while (vertexIdMessageIterator.hasNext()) {
-        vertexIdMessageIterator.next();
-        boolean ownsVertexId =
-            diskBackedMessageStore.addVertexMessages(
-                vertexIdMessageIterator.getCurrentVertexId(),
-                Collections.singleton(
-                    vertexIdMessageIterator.getCurrentMessage()));
-        if (ownsVertexId) {
-          vertexIdMessageIterator.releaseCurrentVertexId();
-        }
-      }
-    } else {
-      throw new IllegalStateException("addPartitionMessages: Doesn't support " +
-          "class " + flushableMessageStore.getClass());
-    }
-    checkMemory();
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    for (I destinationVertex : messageStore.getDestinationVertices()) {
-      FlushableMessageStore<I, M> flushableMessageStore =
-          getMessageStore(destinationVertex);
-      if (flushableMessageStore instanceof DiskBackedMessageStore) {
-        DiskBackedMessageStore<I, M> diskBackedMessageStore =
-            (DiskBackedMessageStore<I, M>) flushableMessageStore;
-        Iterable<M> messages =
-            messageStore.getVertexMessages(destinationVertex);
-        diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
-      } else {
-        throw new IllegalStateException("addMessages: Doesn't support " +
-            "class " + flushableMessageStore.getClass());
-      }
-    }
-    checkMemory();
-  }
-
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      return getMessageStore(vertexId).getVertexMessages(vertexId);
-    } else {
-      return EmptyIterable.get();
-    }
-  }
-
-  @Override
-  public int getNumberOfMessages() {
-    int numOfMessages = 0;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      numOfMessages += messageStore.getNumberOfMessages();
-    }
-    return numOfMessages;
-  }
-
-  @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    List<I> vertices = Lists.newArrayList();
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      Iterables.addAll(vertices, messageStore.getDestinationVertices());
-    }
-    return vertices;
-  }
-
-  @Override
-  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore == null) {
-      return Collections.emptyList();
-    } else {
-      return messageStore.getDestinationVertices();
-    }
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    if (hasMessagesForVertex(vertexId)) {
-      getMessageStore(vertexId).clearVertexMessages(vertexId);
-    }
-  }
-
-  @Override
-  public void clearPartition(int partitionId) throws IOException {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      messageStore.clearAll();
-    }
-  }
-
-  @Override
-  public void clearAll() throws IOException {
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      messageStore.clearAll();
-    }
-    partitionMessageStores.clear();
-  }
-
-  /**
-   * Checks the memory status, flushes if necessary
-   *
-   * @throws IOException
-   */
-  private void checkMemory() throws IOException {
-    while (memoryFull()) {
-      flushOnePartition();
-    }
-  }
-
-  /**
-   * Check if memory is full
-   *
-   * @return True iff memory is full
-   */
-  private boolean memoryFull() {
-    int totalMessages = 0;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      totalMessages += messageStore.getNumberOfMessages();
-    }
-    return totalMessages > maxNumberOfMessagesInMemory;
-  }
-
-  /**
-   * Finds biggest partition and flushes it to the disk
-   *
-   * @throws IOException
-   */
-  private void flushOnePartition() throws IOException {
-    int maxMessages = 0;
-    FlushableMessageStore<I, M> biggestStore = null;
-    for (FlushableMessageStore<I, M> messageStore :
-        partitionMessageStores.values()) {
-      int numMessages = messageStore.getNumberOfMessages();
-      if (numMessages > maxMessages) {
-        maxMessages = numMessages;
-        biggestStore = messageStore;
-      }
-    }
-    if (biggestStore != null) {
-      biggestStore.flush();
-    }
-  }
-
-  /**
-   * Get message store for partition which holds vertex with required vertex
-   * id
-   *
-   * @param vertexId Id of vertex for which we are asking for message store
-   * @return Requested message store
-   */
-  private FlushableMessageStore<I, M> getMessageStore(I vertexId) {
-    int partitionId =
-        service.getVertexPartitionOwner(vertexId).getPartitionId();
-    return getMessageStore(partitionId);
-  }
-
-  /**
-   * Get message store for partition id. It it doesn't exist yet,
-   * creates a new one.
-   *
-   * @param partitionId Id of partition for which we are asking for message
-   *                    store
-   * @return Requested message store
-   */
-  private FlushableMessageStore<I, M> getMessageStore(int partitionId) {
-    FlushableMessageStore<I, M> messageStore =
-        partitionMessageStores.get(partitionId);
-    if (messageStore != null) {
-      return messageStore;
-    }
-    messageStore = fileStoreFactory.newStore(messageClass);
-    FlushableMessageStore<I, M> store =
-        partitionMessageStores.putIfAbsent(partitionId, messageStore);
-    return (store == null) ? messageStore : store;
-  }
-
-  @Override
-  public void writePartition(DataOutput out,
-      int partitionId) throws IOException {
-    FlushableMessageStore<I, M> partitionStore =
-        partitionMessageStores.get(partitionId);
-    out.writeBoolean(partitionStore != null);
-    if (partitionStore != null) {
-      partitionStore.write(out);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(partitionMessageStores.size());
-    for (Entry<Integer, FlushableMessageStore<I, M>> entry :
-        partitionMessageStores.entrySet()) {
-      out.writeInt(entry.getKey());
-      entry.getValue().write(out);
-    }
-  }
-
-  @Override
-  public void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException {
-    if (in.readBoolean()) {
-      FlushableMessageStore<I, M> messageStore =
-          fileStoreFactory.newStore(messageClass);
-      messageStore.readFields(in);
-      partitionMessageStores.put(partitionId, messageStore);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int numStores = in.readInt();
-    for (int s = 0; s < numStores; s++) {
-      int partitionId = in.readInt();
-      FlushableMessageStore<I, M> messageStore =
-          fileStoreFactory.newStore(messageClass);
-      messageStore.readFields(in);
-      partitionMessageStores.put(partitionId, messageStore);
-    }
-  }
-
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param service             Service worker
-   * @param maxMessagesInMemory Number of messages to keep in memory
-   * @param fileStoreFactory    Factory for creating file stores when
-   *                            flushing
-   * @param <I>                 Vertex id
-   * @param <V>                 Vertex data
-   * @param <E>                 Edge data
-   * @param <M>                 Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E> service,
-      int maxMessagesInMemory,
-      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
-          fileStoreFactory) {
-    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
-        fileStoreFactory);
-  }
-
-  /**
-   * Factory for {@link DiskBackedMessageStoreByPartition}
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
-    /** Service worker */
-    private final CentralizedServiceWorker<I, V, E> service;
-    /** Number of messages to keep in memory */
-    private final int maxMessagesInMemory;
-    /** Factory for creating file stores when flushing */
-    private final
-    MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-
-    /**
-     * @param service             Service worker
-     * @param maxMessagesInMemory Number of messages to keep in memory
-     * @param fileStoreFactory    Factory for creating file stores when
-     *                            flushing
-     */
-    public Factory(CentralizedServiceWorker<I, V, E> service,
-        int maxMessagesInMemory,
-        MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
-            fileStoreFactory) {
-      this.service = service;
-      this.maxMessagesInMemory = maxMessagesInMemory;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
-      return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
-          service, maxMessagesInMemory, fileStoreFactory);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
deleted file mode 100644
index 6e7fe55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Message stores which has flush operation
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface FlushableMessageStore<I extends WritableComparable,
-    M extends Writable> extends MessageStore<I, M> {
-  /**
-   * Flushes messages to the disk.
-   *
-   * @throws IOException
-   */
-  void flush() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 9086d78..ba8a005 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
  */
 public class InMemoryMessageStoreFactory<I extends WritableComparable,
     M extends Writable>
-    implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    implements MessageStoreFactory<I, M, MessageStore<I, M>> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(InMemoryMessageStoreFactory.class);
@@ -56,7 +56,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   }
 
   @Override
-  public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+  public MessageStore<I, M> newStore(Class<M> messageClass) {
     if (conf.useCombiner()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("newStore: " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index a6f174d..2af7642 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -28,13 +32,32 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <M> Message data
  */
 public interface MessageStore<I extends WritableComparable,
-    M extends Writable> extends BasicMessageStore<I, M> {
+    M extends Writable> {
   /**
-   * Get number of messages in memory
+   * Gets messages for a vertex.  The lifetime of every message is only
+   * guaranteed until the iterator's next() method is called. Do not hold
+   * references to objects returned by this iterator.
    *
-   * @return Number of messages in memory
+   * @param vertexId Vertex id for which we want to get messages
+   * @return Iterable of messages for a vertex id
+   * @throws java.io.IOException
    */
-  int getNumberOfMessages();
+  Iterable<M> getVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears messages for a vertex.
+   *
+   * @param vertexId Vertex id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearVertexMessages(I vertexId) throws IOException;
+
+  /**
+   * Clears all resources used by this store.
+   *
+   * @throws IOException
+   */
+  void clearAll() throws IOException;
 
   /**
    * Check if we have messages for some vertex
@@ -45,9 +68,49 @@ public interface MessageStore<I extends WritableComparable,
   boolean hasMessagesForVertex(I vertexId);
 
   /**
-   * Gets vertex ids which we have messages for
+   * Adds messages for partition
+   *
+   * @param partitionId Id of partition
+   * @param messages    Collection of vertex ids and messages we want to add
+   * @throws IOException
+   */
+  void addPartitionMessages(
+      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+    throws IOException;
+
+  /**
+   * Gets vertex ids from selected partition which we have messages for
    *
+   * @param partitionId Id of partition
    * @return Iterable over vertex ids which we have messages for
    */
-  Iterable<I> getDestinationVertices();
+  Iterable<I> getPartitionDestinationVertices(int partitionId);
+
+  /**
+   * Clears messages for a partition.
+   *
+   * @param partitionId Partition id for which we want to clear messages
+   * @throws IOException
+   */
+  void clearPartition(int partitionId) throws IOException;
+
+  /**
+   * Serialize messages for one partition.
+   *
+   * @param out         {@link DataOutput} to serialize this object into
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void writePartition(DataOutput out, int partitionId) throws IOException;
+
+  /**
+   * Deserialize messages for one partition
+   *
+   * @param in          {@link DataInput} to deserialize this object
+   *                    from.
+   * @param partitionId Id of partition
+   * @throws IOException
+   */
+  void readFieldsForPartition(DataInput in,
+      int partitionId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
deleted file mode 100644
index d2143c6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store which stores data by partition
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface MessageStoreByPartition<I extends WritableComparable,
-    M extends Writable> extends MessageStore<I, M> {
-  /**
-   * Adds messages for partition
-   *
-   * @param partitionId Id of partition
-   * @param messages    Collection of vertex ids and messages we want to add
-   * @throws IOException
-   */
-  void addPartitionMessages(
-      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
-    throws IOException;
-
-  /**
-   * Gets vertex ids from selected partition which we have messages for
-   *
-   * @param partitionId Id of partition
-   * @return Iterable over vertex ids which we have messages for
-   */
-  Iterable<I> getPartitionDestinationVertices(int partitionId);
-
-  /**
-   * Clears messages for a partition.
-   *
-   * @param partitionId Partition id for which we want to clear messages
-   * @throws IOException
-   */
-  void clearPartition(int partitionId) throws IOException;
-
-  /**
-   * Serialize messages for one partition.
-   *
-   * @param out         {@link DataOutput} to serialize this object into
-   * @param partitionId Id of partition
-   * @throws IOException
-   */
-  void writePartition(DataOutput out, int partitionId) throws IOException;
-
-  /**
-   * Deserialize messages for one partition
-   *
-   * @param in          {@link DataInput} to deserialize this object
-   *                    from.
-   * @param partitionId Id of partition
-   * @throws IOException
-   */
-  void readFieldsForPartition(DataInput in,
-      int partitionId) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index dec9a92..aa6a63e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.io.WritableComparable;
  *
  * @param <I> Vertex id
  * @param <M> Message data
- * @param <S> Message store
+ * @param <MS> Message store
  */
 public interface MessageStoreFactory<I extends WritableComparable,
-    M extends Writable, S extends BasicMessageStore<I, M>> {
+    M extends Writable, MS> {
   /**
    * Creates new message store.
    *
@@ -39,5 +39,5 @@ public interface MessageStoreFactory<I extends WritableComparable,
    * @param messageClass Message class held in the store
    * @return New message store
    */
-  S newStore(Class<M> messageClass);
+  MS newStore(Class<M> messageClass);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
new file mode 100644
index 0000000..970d76d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special iterable that recycles the message
+ *
+ * @param <M> Message data
+ */
+public class MessagesIterable<M extends Writable>
+    extends RepresentativeByteArrayIterable<M> {
+  /** Message class */
+  private final Class<M> messageClass;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param messageClass Message class
+   * @param buf Buffer
+   * @param off Offset to start in the buffer
+   * @param length Length of the buffer
+   */
+  public MessagesIterable(
+      ImmutableClassesGiraphConfiguration conf, Class<M> messageClass,
+      byte[] buf, int off, int length) {
+    super(conf, buf, off, length);
+    this.messageClass = messageClass;
+  }
+
+  @Override
+  protected M createWritable() {
+    return ReflectionUtils.newInstance(messageClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 8710dac..f18af5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -22,7 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -60,44 +59,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     this.combiner = combiner;
   }
 
-  /**
-   * If there is already a message related to the vertex id in the
-   * partition map return that message, otherwise create a new one,
-   * put it in the map and return it
-   *
-   * @param vertexId Id of vertex
-   * @param partitionMap Partition map
-   * @return Message for this vertex
-   */
-  private M getOrCreateCurrentMessage(I vertexId,
-      ConcurrentMap<I, M> partitionMap) {
-    M currentMessage = partitionMap.get(vertexId);
-    if (currentMessage == null) {
-      M newMessage = combiner.createInitialMessage();
-      currentMessage = partitionMap.putIfAbsent(vertexId, newMessage);
-      if (currentMessage == null) {
-        currentMessage = newMessage;
-      }
-    }
-    return currentMessage;
-  }
-
-  /**
-   * Add a single message for vertex to a partition map
-   *
-   * @param vertexId Id of vertex which received message
-   * @param message Message to add
-   * @param partitionMap Partition map to add the message to
-   * @throws IOException
-   */
-  private void addVertexMessageToPartition(I vertexId, M message,
-      ConcurrentMap<I, M> partitionMap) throws IOException {
-    M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
-    synchronized (currentMessage) {
-      combiner.combine(vertexId, currentMessage, message);
-    }
-  }
-
   @Override
   public void addPartitionMessages(
       int partitionId,
@@ -149,26 +110,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     return message;
   }
 
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    if (messageStore instanceof OneMessagePerVertexStore) {
-      OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
-          (OneMessagePerVertexStore<I, M>) messageStore;
-      for (Map.Entry<Integer, ConcurrentMap<I, M>>
-          partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
-        ConcurrentMap<I, M> partitionMap =
-              getOrCreatePartitionMap(partitionEntry.getKey());
-        for (Map.Entry<I, M> vertexEntry :
-            partitionEntry.getValue().entrySet()) {
-          addVertexMessageToPartition(vertexEntry.getKey(),
-              vertexEntry.getValue(), partitionMap);
-        }
-      }
-    } else {
-      throw new IllegalArgumentException("addMessages: Illegal argument " +
-          messageStore.getClass());
-    }
-  }
 
   /**
    * Create new factory for this message store
@@ -180,7 +121,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
    * @return Factory
    */
   public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+  MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
       CentralizedServiceWorker<I, ?, ?> service,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
@@ -194,7 +135,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
    */
   private static class Factory<I extends WritableComparable,
       M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
     private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
@@ -211,7 +152,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+    public MessageStore<I, M> newStore(Class<M> messageClass) {
       return new OneMessagePerVertexStore<I, M>(messageClass, service,
           config.<M>createCombiner(), config);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
deleted file mode 100644
index 23bbbc5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(MessageStore)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
-    M extends Writable> implements BasicMessageStore<I, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SequentialFileMessageStore.class);
-  /** Message class */
-  private final Class<M> messageClass;
-  /** File in which we store data */
-  private final File file;
-  /** Configuration which we need for reading data */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
-  /** Buffer size to use when reading and writing files */
-  private final int bufferSize;
-  /** File input stream */
-  private DataInputStream in;
-  /** How many vertices do we have left to read in the file */
-  private int verticesLeft;
-  /** Id of currently read vertex */
-  private I currentVertexId;
-
-  /**
-   * Stores message on the disk.
-   *
-   * @param messageClass Message class held in the store
-   * @param config       Configuration used later for reading
-   * @param bufferSize   Buffer size to use when reading and writing
-   * @param fileName     File in which we want to store messages
-   * @throws IOException
-   */
-  public SequentialFileMessageStore(
-      Class<M> messageClass,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
-      int bufferSize,
-      String fileName) {
-    this.messageClass = messageClass;
-    this.config = config;
-    this.bufferSize = bufferSize;
-    file = new File(fileName);
-  }
-
-  @Override
-  public void addMessages(MessageStore<I, M> messageStore) throws IOException {
-    // Writes messages to its file
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("addMessages: Deleting " + file);
-      }
-      file.delete();
-    }
-    file.createNewFile();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addMessages: Creating " + file);
-    }
-
-    DataOutputStream out = null;
-
-    try {
-      out = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), bufferSize));
-      int destinationVertexIdCount =
-          Iterables.size(messageStore.getDestinationVertices());
-      out.writeInt(destinationVertexIdCount);
-
-      // Since the message store messages might not be sorted, sort them if
-      // necessary
-      SortedSet<I> sortedSet;
-      if (messageStore.getDestinationVertices() instanceof SortedSet) {
-        sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
-      } else {
-        sortedSet =
-            Sets.newTreeSet(messageStore.getDestinationVertices());
-        for (I destinationVertexId : messageStore.getDestinationVertices()) {
-          sortedSet.add(destinationVertexId);
-        }
-      }
-
-      // Dump the vertices and their messages in a sorted order
-      for (I destinationVertexId : sortedSet) {
-        destinationVertexId.write(out);
-        Iterable<M> messages =
-            messageStore.getVertexMessages(destinationVertexId);
-        int messageCount = Iterables.size(messages);
-        out.writeInt(messageCount);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("addMessages: For vertex id " + destinationVertexId +
-              ", messages = " + messageCount + " to file " + file);
-        }
-        for (M message : messages) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("addMessages: Wrote " + message + " to " + file);
-          }
-          message.write(out);
-        }
-      }
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-  }
-
-  /**
-   * Reads messages for a vertex. It will find the messages only if all
-   * previous reads used smaller vertex ids than this one - messages should
-   * be retrieved in increasing order of vertex ids.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Messages for the selected vertex, or empty list if not used
-   *         correctly
-   * @throws IOException
-   */
-  @Override
-  public Iterable<M> getVertexMessages(I vertexId) throws
-      IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
-          " (currently " + currentVertexId + ") from " + file);
-    }
-    if (in == null) {
-      startReading();
-    }
-
-    I nextVertexId = getCurrentVertexId();
-    while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
-      nextVertexId = getNextVertexId();
-    }
-
-    if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
-      return EmptyIterable.get();
-    }
-
-    return readMessagesForCurrentVertex();
-  }
-
-  @Override
-  public void clearVertexMessages(I vertexId) throws IOException { }
-
-  @Override
-  public void clearAll() throws IOException {
-    endReading();
-    file.delete();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(file.length());
-    FileInputStream input = new FileInputStream(file);
-    try {
-      byte[] buffer = new byte[bufferSize];
-      while (true) {
-        int length = input.read(buffer);
-        if (length < 0) {
-          break;
-        }
-        out.write(buffer, 0, length);
-      }
-    } finally {
-      input.close();
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    FileOutputStream output = new FileOutputStream(file);
-    try {
-      long fileLength = in.readLong();
-      byte[] buffer = new byte[bufferSize];
-      for (long position = 0; position < fileLength; position += bufferSize) {
-        int bytes = (int) Math.min(bufferSize, fileLength - position);
-        in.readFully(buffer, 0, bytes);
-        output.write(buffer);
-      }
-    } finally {
-      output.close();
-    }
-  }
-
-  /**
-   * Prepare for reading
-   *
-   * @throws IOException
-   */
-  private void startReading() throws IOException {
-    currentVertexId = null;
-    in = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file), bufferSize));
-    verticesLeft = in.readInt();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("startReading: File " + file + " with " +
-          verticesLeft + " vertices left");
-    }
-  }
-
-  /**
-   * Gets current vertex id.
-   * <p/>
-   * If there is a vertex id whose messages haven't been read yet it
-   * will return that vertex id, otherwise it will read and return the next
-   * one.
-   *
-   * @return Current vertex id
-   * @throws IOException
-   */
-  private I getCurrentVertexId() throws IOException {
-    if (currentVertexId != null) {
-      return currentVertexId;
-    } else {
-      return getNextVertexId();
-    }
-  }
-
-  /**
-   * Gets next vertex id.
-   * <p/>
-   * If there is a vertex whose messages haven't been read yet it
-   * will read and skip over its messages to get to the next vertex.
-   *
-   * @return Next vertex id
-   * @throws IOException
-   */
-  private I getNextVertexId() throws IOException {
-    if (currentVertexId != null) {
-      readMessagesForCurrentVertex();
-    }
-    if (verticesLeft == 0) {
-      return null;
-    }
-    currentVertexId = config.createVertexId();
-    currentVertexId.readFields(in);
-    return currentVertexId;
-  }
-
-  /**
-   * Reads messages for current vertex.
-   *
-   * @return Messages for current vertex
-   * @throws IOException
-   */
-  private Collection<M> readMessagesForCurrentVertex() throws IOException {
-    int messagesSize = in.readInt();
-    List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
-    for (int i = 0; i < messagesSize; i++) {
-      M message = ReflectionUtils.newInstance(messageClass);
-      try {
-        message.readFields(in);
-      } catch (IOException e) {
-        throw new IllegalStateException("readMessagesForCurrentVertex: " +
-            "Failed to read message from " + i + " of " +
-            messagesSize + " for vertex id " + currentVertexId + " from " +
-            file, e);
-      }
-      messages.add(message);
-    }
-    currentVertexDone();
-    return messages;
-  }
-
-  /**
-   * Release current vertex.
-   *
-   * @throws IOException
-   */
-  private void currentVertexDone() throws IOException {
-    currentVertexId = null;
-    verticesLeft--;
-    if (verticesLeft == 0) {
-      endReading();
-    }
-  }
-
-  /**
-   * Call when we are done reading, for closing files.
-   *
-   * @throws IOException
-   */
-  private void endReading() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("endReading: Stopped reading " + file);
-    }
-    if (in != null) {
-      in.close();
-      in = null;
-    }
-  }
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param config Hadoop configuration
-   * @param <I>    Vertex id
-   * @param <M>    Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, M extends Writable>
-  MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration config) {
-    return new Factory<I, M>(config);
-  }
-
-  /**
-   * Factory for {@link SequentialFileMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      M extends Writable>
-      implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration config;
-    /** Directories in which we'll keep necessary files */
-    private final String[] directories;
-    /** Buffer size to use when reading and writing */
-    private final int bufferSize;
-    /** Counter for created message stores */
-    private final AtomicInteger storeCounter;
-
-    /**
-     * Constructor.
-     *
-     * @param config Hadoop configuration
-     */
-    public Factory(ImmutableClassesGiraphConfiguration config) {
-      this.config = config;
-      String jobId = config.get("mapred.job.id", "Unknown Job");
-      int taskId   = config.getTaskPartition();
-      List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
-      Collections.shuffle(userPaths);
-      directories = new String[userPaths.size()];
-      int i = 0;
-      for (String path : userPaths) {
-        String directory = path + File.separator + jobId + File.separator +
-            taskId + File.separator;
-        directories[i++] = directory;
-        new File(directory).mkdirs();
-      }
-      this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
-      storeCounter = new AtomicInteger();
-    }
-
-    @Override
-    public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
-      int idx = Math.abs(storeCounter.getAndIncrement());
-      String fileName =
-          directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(messageClass, config,
-          bufferSize, fileName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 1a91dfb..c9d869c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -18,14 +18,12 @@
 
 package org.apache.giraph.comm.messages;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -34,7 +32,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
- * Abstract class for {@link MessageStoreByPartition} which allows any kind
+ * Abstract class for {@link MessageStore} which allows any kind
  * of object to hold messages for one vertex.
  * Simple in memory message store implemented with a two level concurrent
  * hash map.
@@ -44,7 +42,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <T> Type of object which holds messages for one vertex
  */
 public abstract class SimpleMessageStore<I extends WritableComparable,
-    M extends Writable, T> implements MessageStoreByPartition<I, M>  {
+    M extends Writable, T> implements MessageStore<I, M>  {
   /** Message class */
   protected final Class<M> messageClass;
   /** Service worker */
@@ -150,16 +148,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   public boolean hasMessagesForVertex(I vertexId) {
     ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));
-    return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
-  }
-
-  @Override
-  public Iterable<I> getDestinationVertices() {
-    List<I> vertices = Lists.newArrayList();
-    for (ConcurrentMap<I, ?> partitionMap : map.values()) {
-      vertices.addAll(partitionMap.keySet());
-    }
-    return vertices;
+    return partitionMap != null && partitionMap.containsKey(vertexId);
   }
 
   @Override
@@ -174,15 +163,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public int getNumberOfMessages() {
-    int numberOfMessages = 0;
-    for (ConcurrentMap<I, T> partitionMap : map.values()) {
-      numberOfMessages += getNumberOfMessagesIn(partitionMap);
-    }
-    return numberOfMessages;
-  }
-
-  @Override
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
@@ -197,15 +177,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(map.size());
-    for (int partitionId : map.keySet()) {
-      out.writeInt(partitionId);
-      writePartition(out, partitionId);
-    }
-  }
-
-  @Override
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
@@ -221,15 +192,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    int numPartitions = in.readInt();
-    for (int p = 0; p < numPartitions; p++) {
-      int partitionId = in.readInt();
-      readFieldsForPartition(in, partitionId);
-    }
-  }
-
-  @Override
   public void clearVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));