You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/05/02 23:51:34 UTC

git commit: updated refs/heads/trunk to dd47ce6

Repository: giraph
Updated Branches:
  refs/heads/trunk f732f300f -> dd47ce6ab


GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dd47ce6a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dd47ce6a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dd47ce6a

Branch: refs/heads/trunk
Commit: dd47ce6abdf01adc3584d6e7e0d41c6749d02d30
Parents: f732f30
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri May 2 14:50:22 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri May 2 14:50:22 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../ByteArrayMessagesPerVertexStore.java        | 28 ++++--
 .../messages/InMemoryMessageStoreFactory.java   | 26 ++++--
 .../comm/messages/MessageStoreFactory.java      | 22 +++++
 .../comm/messages/OneMessagePerVertexStore.java | 28 ++++--
 .../out_of_core/DiskBackedMessageStore.java     | 51 +---------
 .../DiskBackedMessageStoreFactory.java          | 97 ++++++++++++++++++++
 .../PartitionDiskBackedMessageStore.java        | 27 +++++-
 .../out_of_core/SequentialFileMessageStore.java | 46 +++++++---
 .../giraph/comm/netty/NettyWorkerServer.java    | 34 ++-----
 .../org/apache/giraph/conf/GiraphConstants.java | 13 ++-
 .../giraph/partition/SimplePartition.java       | 42 ++++++---
 .../apache/giraph/graph/TestVertexAndEdges.java |  2 +
 .../giraph/jython/TestJythonComputation.java    | 19 +++-
 14 files changed, 308 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a3412bb..2b4db33 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo)
+
   GIRAPH-825: Fix DiskBackedPartitionStore to work with current trunk (armax00 via claudio)
 
   GIRAPH-864: 'mvn clean test' fails for rexster (armax00 via claudio)

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 2381078..e8b3b30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -18,25 +18,25 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataInput;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
-import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
+import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Iterators;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Implementation of {@link SimpleMessageStore} where multiple messages are
  * stored per vertex as byte arrays.  Used when there is no combiner provided.
@@ -202,9 +202,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   private static class Factory<I extends WritableComparable, M extends Writable>
       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
-    private final CentralizedServiceWorker<I, ?, ?> service;
+    private CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
     /**
      * @param service Worker service
@@ -222,5 +222,17 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
       return new ByteArrayMessagesPerVertexStore<I, M>(messageValueFactory,
           service, config);
     }
+
+    @Override
+    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+      this.service = service;
+      this.config = conf;
+    }
+
+    @Override
+    public boolean shouldTraverseMessagesInOrder() {
+      return false;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 22a41cd..f691d3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -51,18 +51,14 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
       Logger.getLogger(InMemoryMessageStoreFactory.class);
 
   /** Service worker */
-  private final CentralizedServiceWorker<I, ?, ?> service;
+  private CentralizedServiceWorker<I, ?, ?> service;
   /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+  private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 
   /**
-   * @param service Worker service
-   * @param conf    Configuration
+   * Default constructor allowing class invocation via Reflection.
    */
-  public InMemoryMessageStoreFactory(CentralizedServiceWorker<I, ?, ?> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
-    this.service = service;
-    this.conf = conf;
+  public InMemoryMessageStoreFactory() {
   }
 
   @Override
@@ -113,6 +109,18 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
           (conf.useMessageCombiner() ? " message combiner " +
               conf.getMessageCombinerClass() : " no combiner"));
     }
-    return (MessageStore<I, M>) messageStore;
+    return messageStore;
+  }
+
+  @Override
+  public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+    this.service = service;
+    this.conf = conf;
+  }
+
+  @Override
+  public boolean shouldTraverseMessagesInOrder() {
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index f582ea2..6149a9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.messages;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -42,4 +44,24 @@ public interface MessageStoreFactory<I extends WritableComparable,
    * @return New message store
    */
   MS newStore(MessageValueFactory<M> messageValueFactory);
+
+  /**
+   * Implementation class should use this method of initialization
+   * of any required internal state.
+   *
+   * @param service Service to get partition mappings
+   * @param conf Configuration
+   */
+  void initialize(CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf);
+
+  /**
+   * This method is more for the performance optimization. If the message
+   * traversal would be done in order then data structure which is optimized
+   * for such traversal can be used.
+   *
+   * @return true if the messages would be traversed in order
+   * else return false
+   */
+  boolean shouldTraverseMessagesInOrder();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index acf68ea..bb581c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -18,6 +18,12 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -26,12 +32,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Implementation of {@link SimpleMessageStore} where we have a single
  * message per vertex.
@@ -140,9 +140,9 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       M extends Writable>
       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
     /** Service worker */
-    private final CentralizedServiceWorker<I, ?, ?> service;
+    private CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+    private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
     /**
      * @param service Worker service
@@ -160,5 +160,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
           config.<M>createMessageCombiner(), config);
     }
+
+    @Override
+    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+      this.service = service;
+      this.config = conf;
+    }
+
+    @Override
+    public boolean shouldTraverseMessagesInOrder() {
+      return false;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 346e3b3..1a76306 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.comm.messages.out_of_core;
 
 import com.google.common.collect.Maps;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
@@ -27,7 +28,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -273,54 +273,13 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   public static <I extends WritableComparable, V extends Writable,
       E extends Writable, M extends Writable>
   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E> service,
+    CentralizedServiceWorker<I, V, E> service,
       int maxMessagesInMemory,
       MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
           fileStoreFactory) {
-    return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+    return new DiskBackedMessageStoreFactory<I, V, E, M>(service,
+        maxMessagesInMemory,
         fileStoreFactory);
   }
-
-  /**
-   * Factory for {@link DiskBackedMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
-    /** Service worker */
-    private final CentralizedServiceWorker<I, V, E> service;
-    /** Number of messages to keep in memory */
-    private final int maxMessagesInMemory;
-    /** Factory for creating file stores when flushing */
-    private final
-    MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
-    fileStoreFactory;
-
-    /**
-     * @param service             Service worker
-     * @param maxMessagesInMemory Number of messages to keep in memory
-     * @param fileStoreFactory    Factory for creating file stores when
-     *                            flushing
-     */
-    public Factory(CentralizedServiceWorker<I, V, E> service,
-        int maxMessagesInMemory,
-        MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
-            fileStoreFactory) {
-      this.service = service;
-      this.maxMessagesInMemory = maxMessagesInMemory;
-      this.fileStoreFactory = fileStoreFactory;
-    }
-
-    @Override
-    public MessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
-          service, maxMessagesInMemory, fileStoreFactory);
-    }
-  }
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
new file mode 100644
index 0000000..f2b31c0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message store factory which persist the messages on the disk.
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+public class DiskBackedMessageStoreFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+  /** Service worker */
+  private CentralizedServiceWorker<I, V, E> service;
+  /** Number of messages to keep in memory */
+  private int maxMessagesInMemory;
+  /** Factory for creating file stores when flushing */
+  private MessageStoreFactory<I, M,
+    PartitionDiskBackedMessageStore<I, M>> fileStoreFactory;
+
+  /**
+   * Default constructor class helps in class invocation via Reflection
+   */
+  public DiskBackedMessageStoreFactory() {
+  }
+
+  /**
+   * @param service Service worker
+   * @param maxMessagesInMemory Number of messages to keep in memory
+   * @param fileStoreFactory Factory for creating file stores when flushing
+   */
+  public DiskBackedMessageStoreFactory(
+      CentralizedServiceWorker<I, V, E> service,
+      int maxMessagesInMemory,
+      MessageStoreFactory<I, M,
+        PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) {
+    this.service = service;
+    this.maxMessagesInMemory = maxMessagesInMemory;
+    this.fileStoreFactory = fileStoreFactory;
+  }
+
+  @Override
+  public MessageStore<I, M>
+  newStore(MessageValueFactory<M> messageValueFactory) {
+    return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
+        service, maxMessagesInMemory, fileStoreFactory);
+  }
+
+  @Override
+  public void initialize(CentralizedServiceWorker service,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
+
+    MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
+      fileMessageStoreFactory =
+        SequentialFileMessageStore.newFactory(conf);
+    this.fileStoreFactory =
+        PartitionDiskBackedMessageStore.newFactory(conf,
+            fileMessageStoreFactory);
+
+    this.service = service;
+  }
+
+  @Override
+  public boolean shouldTraverseMessagesInOrder() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index 7d46d30..bece774 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -18,9 +18,6 @@
 
 package org.apache.giraph.comm.messages.out_of_core;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -34,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -42,6 +40,10 @@ import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * Message storage with in-memory map of messages and with support for
  * flushing all the messages to the disk. Holds messages for a single partition.
@@ -263,7 +265,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     // read destination vertices
     int numVertices = in.readInt();
     for (int v = 0; v < numVertices; v++) {
-      I vertexId = (I) config.createVertexId();
+      I vertexId = config.createVertexId();
       vertexId.readFields(in);
       destinationVertices.add(vertexId);
     }
@@ -343,5 +345,22 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
       return new PartitionDiskBackedMessageStore<I, M>(messageValueFactory,
           config, fileStoreFactory);
     }
+
+    @Override
+    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+      /* Implementation of this method is required if the class is to
+       * be exposed publicly and allow instantiating the class via the
+       * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
+       * a private class, hence the implementation of this method is skipped
+       * as the caller knows the specific required constructor parameters
+       * for instantiation.
+      */
+    }
+
+    @Override
+    public boolean shouldTraverseMessagesInOrder() {
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 51c05da..5988459 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -18,19 +18,7 @@
 
 package org.apache.giraph.comm.messages.out_of_core;
 
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -49,7 +37,20 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 /**
  * Used for writing and reading collection of messages to the disk.
@@ -413,5 +414,22 @@ public class SequentialFileMessageStore<I extends WritableComparable,
       return new SequentialFileMessageStore<I, M>(messageValueFactory, config,
           bufferSize, fileName);
     }
+
+    @Override
+    public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+      /* Implementation of this method is required if the class is to
+       * be exposed publicly and allow instantiating the class via the
+       * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
+       * a private class, hence the implementation of this method is skipped
+       * as the caller knows the specific required constructor parameters
+       * for instantiation.
+       */
+    }
+
+    @Override
+    public boolean shouldTraverseMessagesInOrder() {
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 4f6c17b..adb96cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,18 +21,15 @@ package org.apache.giraph.comm.netty;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
-import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,8 +43,7 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Map.Entry;
 
-import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 
 /**
  * Netty worker server that implement {@link WorkerServer} and contains
@@ -107,24 +103,14 @@ public class NettyWorkerServer<I extends WritableComparable,
    */
   private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   createMessageStoreFactory() {
-    boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
-    if (!useOutOfCoreMessaging) {
-      return new InMemoryMessageStoreFactory<I, Writable>(service, conf);
-    } else {
-      int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
-            "maxMessagesInMemory = " + maxMessagesInMemory);
-      }
-      MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
-          fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
-      MessageStoreFactory<I, Writable,
-          PartitionDiskBackedMessageStore<I, Writable>>
-          partitionStoreFactory =
-          PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory);
-      return DiskBackedMessageStore.newFactory(service,
-          maxMessagesInMemory, partitionStoreFactory);
-    }
+    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+        MESSAGE_STORE_FACTORY_CLASS.get(conf);
+
+    MessageStoreFactory messageStoreFactoryInstance =
+        ReflectionUtils.newInstance(messageStoreFactoryClass);
+    messageStoreFactoryInstance.initialize(service, conf);
+
+    return messageStoreFactoryInstance;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e257b4a..e791d62 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,6 +20,8 @@ package org.apache.giraph.conf;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
@@ -92,6 +94,13 @@ public interface GiraphConstants {
           TypesHolder.class,
           "TypesHolder, used if Computation not set - optional");
 
+  /** Message Store Factory */
+  ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS =
+      ClassConfOption.create("giraph.messageStoreFactoryClass",
+          InMemoryMessageStoreFactory.class,
+          MessageStoreFactory.class,
+          "Message Store Factory Class that is to be used");
+
   /** Language user's graph types are implemented in */
   PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES =
       PerGraphTypeEnumConfOption.create("giraph.types.language",
@@ -833,10 +842,6 @@ public interface GiraphConstants {
           "Comma-separated list of directories in the local file system for " +
           "out-of-core messages.");
 
-  /** Whether or not to use out-of-core messages */
-  BooleanConfOption USE_OUT_OF_CORE_MESSAGES =
-      new BooleanConfOption("giraph.useOutOfCoreMessages", false,
-          "Whether or not to use out-of-core messages");
   /**
    * If using out-of-core messaging, it tells how much messages do we keep
    * in memory.

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 1609846..de2ffd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,15 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import com.google.common.collect.Maps;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 
-import javax.annotation.concurrent.ThreadSafe;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -34,7 +27,18 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
-import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.Maps;
 
 /**
  * A simple map-based container that stores vertices.  Vertex ids will map to
@@ -60,7 +64,7 @@ public class SimplePartition<I extends WritableComparable,
   @Override
   public void initialize(int partitionId, Progressable progressable) {
     super.initialize(partitionId, progressable);
-    if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
+    if (shouldTraverseMessageInOrder()) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
@@ -141,7 +145,7 @@ public class SimplePartition<I extends WritableComparable,
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
-    if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
+    if (shouldTraverseMessageInOrder()) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
@@ -173,4 +177,20 @@ public class SimplePartition<I extends WritableComparable,
   public Iterator<Vertex<I, V, E>> iterator() {
     return vertexMap.values().iterator();
   }
+
+  /**
+   * This method specifies if the message store factory, that is been
+   * configured, has requirement of traversing messages in order.
+   *
+   * @return true if the message store factory has specified traversing
+   * messages in ordered, else return false.
+   */
+  private boolean shouldTraverseMessageInOrder() {
+    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+        MESSAGE_STORE_FACTORY_CLASS.get(getConf());
+
+    MessageStoreFactory messageStoreFactoryInstance =
+        ReflectionUtils.newInstance(messageStoreFactoryClass);
+    return messageStoreFactoryInstance.shouldTraverseMessagesInOrder();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index b404646..86d75a3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -19,6 +19,8 @@ package org.apache.giraph.graph;
 
 import com.google.common.collect.Lists;
 
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ArrayListEdges;

http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
index 5feabaf..49a338c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
@@ -17,7 +17,11 @@
  */
 package org.apache.giraph.jython;
 
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.graph.Language;
@@ -37,8 +41,19 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 
 public class TestJythonComputation {
+
+  @Test
+  public void testCountEdgesDiskBackedMessageStoreFactory() throws Exception {
+    testCountEdges(DiskBackedMessageStoreFactory.class);
+  }
+
   @Test
-  public void testCountEdges() throws Exception {
+  public void testCountEdgesInMemoryMessageStoreFactory() throws Exception {
+    testCountEdges(InMemoryMessageStoreFactory.class);
+  }
+
+  public void testCountEdges(Class<? extends MessageStoreFactory>
+  messageStoreFactoryClass) throws Exception {
     String[] edges = new String[] {
         "1 2",
         "2 3",
@@ -57,6 +72,8 @@ public class TestJythonComputation {
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
+        messageStoreFactoryClass);
     Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
 
     Map<Integer, Integer> values = parseResults(results);