You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:07 UTC

[11/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 3cd1175..97c8a35 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -23,6 +23,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
 import org.apache.giraph.utils.VertexIdIterator;
@@ -47,13 +48,15 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   /**
    * Constructor
    *
+   * @param messageClass Message class held in the store
    * @param service Service worker
    * @param config Hadoop configuration
    */
   public ByteArrayMessagesPerVertexStore(
-      CentralizedServiceWorker<I, ?, ?, M> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
-    super(service, config);
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    super(messageClass, service, config);
   }
 
   /**
@@ -142,7 +145,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
     @Override
     protected M createWritable() {
-      return config.createMessageValue();
+      return ReflectionUtils.newInstance(messageClass);
     }
   }
 
@@ -175,7 +178,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
     @Override
     protected M createWritable() {
-      return config.createMessageValue();
+      return ReflectionUtils.newInstance(messageClass);
     }
   }
 
@@ -222,8 +225,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    */
   public static <I extends WritableComparable, M extends Writable>
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, ?, ?, M> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
   }
 
@@ -272,23 +275,23 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   private static class Factory<I extends WritableComparable, M extends Writable>
       implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
     /** Service worker */
-    private final CentralizedServiceWorker<I, ?, ?, M> service;
+    private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
     /**
      * @param service Worker service
      * @param config  Hadoop configuration
      */
-    public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    public Factory(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
       this.service = service;
       this.config = config;
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore() {
-      return new ByteArrayMessagesPerVertexStore(service, config);
+    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+      return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
index 26abb94..2712edd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -48,6 +49,8 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class DiskBackedMessageStore<I extends WritableComparable,
     M extends Writable> implements FlushableMessageStore<I, M> {
+  /** Message class */
+  private final Class<M> messageClass;
   /**
    * In-memory message map (must be sorted to insure that the ids are
    * ordered)
@@ -55,7 +58,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
   inMemoryMessages;
   /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
   /** Counter for number of messages in-memory */
   private final AtomicInteger numberOfMessagesInMemory;
   /** To keep vertex ids which we have messages for */
@@ -71,13 +74,16 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   /**
    * Constructor.
    *
+   * @param messageClass     Message class held in the store
    * @param config           Hadoop configuration
    * @param fileStoreFactory Factory for creating file stores when flushing
    */
   public DiskBackedMessageStore(
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
     inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+    this.messageClass = messageClass;
     this.config = config;
     numberOfMessagesInMemory = new AtomicInteger(0);
     destinationVertices =
@@ -154,7 +160,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
 
     @Override
     protected M createWritable() {
-      return config.createMessageValue();
+      return ReflectionUtils.newInstance(messageClass);
     }
   }
 
@@ -289,7 +295,8 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     } finally {
       rwLock.writeLock().unlock();
     }
-    BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+    BasicMessageStore<I, M> fileStore =
+        fileStoreFactory.newStore(messageClass);
     fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
 
     synchronized (fileStores) {
@@ -351,7 +358,8 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     // read file stores
     int numFileStores = in.readInt();
     for (int s = 0; s < numFileStores; s++) {
-      BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+      BasicMessageStore<I, M> fileStore =
+          fileStoreFactory.newStore(messageClass);
       fileStore.readFields(in);
       fileStores.add(fileStore);
     }
@@ -370,7 +378,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
    */
   public static <I extends WritableComparable, M extends Writable>
   MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
     return new Factory<I, M>(config, fileStoreFactory);
   }
@@ -402,8 +410,9 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     }
 
     @Override
-    public FlushableMessageStore<I, M> newStore() {
-      return new DiskBackedMessageStore<I, M>(config, fileStoreFactory);
+    public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
+      return new DiskBackedMessageStore<I, M>(messageClass, config,
+          fileStoreFactory);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
index 02bfb1f..4a28949 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
@@ -47,8 +47,10 @@ import java.util.concurrent.ConcurrentMap;
 public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     MessageStoreByPartition<I, M> {
+  /** Message class */
+  private final Class<M> messageClass;
   /** Service worker */
-  private final CentralizedServiceWorker<I, V, E, M> service;
+  private final CentralizedServiceWorker<I, V, E> service;
   /** Number of messages to keep in memory */
   private final int maxNumberOfMessagesInMemory;
   /** Factory for creating file stores when flushing */
@@ -57,18 +59,20 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
   /** Map from partition id to its message store */
   private final
   ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
-
   /**
+   * @param messageClass                Message class held in the store
    * @param service                     Service worker
    * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
    * @param fileStoreFactory            Factory for creating file stores
    *                                    when flushing
    */
   public DiskBackedMessageStoreByPartition(
-      CentralizedServiceWorker<I, V, E, M> service,
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, V, E> service,
       int maxNumberOfMessagesInMemory,
       MessageStoreFactory<I, M, FlushableMessageStore<I,
           M>> fileStoreFactory) {
+    this.messageClass = messageClass;
     this.service = service;
     this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
     this.fileStoreFactory = fileStoreFactory;
@@ -267,7 +271,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
     if (messageStore != null) {
       return messageStore;
     }
-    messageStore = fileStoreFactory.newStore();
+    messageStore = fileStoreFactory.newStore(messageClass);
     FlushableMessageStore<I, M> store =
         partitionMessageStores.putIfAbsent(partitionId, messageStore);
     return (store == null) ? messageStore : store;
@@ -298,7 +302,8 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
-      FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+      FlushableMessageStore<I, M> messageStore =
+          fileStoreFactory.newStore(messageClass);
       messageStore.readFields(in);
       partitionMessageStores.put(partitionId, messageStore);
     }
@@ -309,7 +314,8 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
     int numStores = in.readInt();
     for (int s = 0; s < numStores; s++) {
       int partitionId = in.readInt();
-      FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+      FlushableMessageStore<I, M> messageStore =
+          fileStoreFactory.newStore(messageClass);
       messageStore.readFields(in);
       partitionMessageStores.put(partitionId, messageStore);
     }
@@ -332,7 +338,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
   public static <I extends WritableComparable, V extends Writable,
       E extends Writable, M extends Writable>
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E, M> service,
+      CentralizedServiceWorker<I, V, E> service,
       int maxMessagesInMemory,
       MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
           fileStoreFactory) {
@@ -352,7 +358,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
       V extends Writable, E extends Writable, M extends Writable>
       implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
     /** Service worker */
-    private final CentralizedServiceWorker<I, V, E, M> service;
+    private final CentralizedServiceWorker<I, V, E> service;
     /** Number of messages to keep in memory */
     private final int maxMessagesInMemory;
     /** Factory for creating file stores when flushing */
@@ -365,7 +371,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
      * @param fileStoreFactory    Factory for creating file stores when
      *                            flushing
      */
-    public Factory(CentralizedServiceWorker<I, V, E, M> service,
+    public Factory(CentralizedServiceWorker<I, V, E> service,
         int maxMessagesInMemory,
         MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
             fileStoreFactory) {
@@ -375,9 +381,9 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore() {
-      return new DiskBackedMessageStoreByPartition<I, V, E, M>(service,
-          maxMessagesInMemory, fileStoreFactory);
+    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+      return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
+          service, maxMessagesInMemory, fileStoreFactory);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
new file mode 100644
index 0000000..9086d78
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Message store factory which produces message stores which hold all
+ * messages in memory. Depending on whether or not combiner is currently used,
+ * this factory creates {@link OneMessagePerVertexStore} or
+ * {@link ByteArrayMessagesPerVertexStore}
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class InMemoryMessageStoreFactory<I extends WritableComparable,
+    M extends Writable>
+    implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(InMemoryMessageStoreFactory.class);
+
+  /** Service worker */
+  private final CentralizedServiceWorker<I, ?, ?> service;
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+
+  /**
+   * @param service Worker service
+   * @param conf    Configuration
+   */
+  public InMemoryMessageStoreFactory(CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+    this.service = service;
+    this.conf = conf;
+  }
+
+  @Override
+  public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+    if (conf.useCombiner()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("newStore: " +
+            "Using OneMessagePerVertexStore with " + conf.getCombinerClass());
+      }
+      return new OneMessagePerVertexStore<I, M>(
+          messageClass, service, conf.<M>createCombiner(), conf);
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("newStore: " +
+            "Using ByteArrayMessagesPerVertexStore since there is no combiner");
+      }
+      return new ByteArrayMessagesPerVertexStore<I, M>(
+          messageClass, service, conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 3c13c47..dec9a92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -31,9 +31,13 @@ import org.apache.hadoop.io.WritableComparable;
 public interface MessageStoreFactory<I extends WritableComparable,
     M extends Writable, S extends BasicMessageStore<I, M>> {
   /**
-   * Creates new message store
+   * Creates new message store.
    *
+   * Note: Combiner class in Configuration can be changed,
+   * this method should return MessageStore which uses current combiner
+   *
+   * @param messageClass Message class held in the store
    * @return New message store
    */
-  S newStore();
+  S newStore(Class<M> messageClass);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 7db0266..8710dac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -28,6 +28,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -45,14 +46,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   private final Combiner<I, M> combiner;
 
   /**
+   * @param messageClass Message class held in the store
    * @param service  Service worker
    * @param combiner Combiner for messages
    * @param config   Hadoop configuration
    */
-  OneMessagePerVertexStore(CentralizedServiceWorker<I, ?, ?, M> service,
+  OneMessagePerVertexStore(
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, ?, ?> service,
       Combiner<I, M> combiner,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
-    super(service, config);
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    super(messageClass, service, config);
     this.combiner = combiner;
   }
 
@@ -140,7 +144,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
 
   @Override
   protected M readFieldsForMessages(DataInput in) throws IOException {
-    M message = config.createMessageValue();
+    M message = ReflectionUtils.newInstance(messageClass);
     message.readFields(in);
     return message;
   }
@@ -177,8 +181,8 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
    */
   public static <I extends WritableComparable, M extends Writable>
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, ?, ?, M> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     return new Factory<I, M>(service, config);
   }
 
@@ -192,26 +196,24 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
       M extends Writable>
       implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
     /** Service worker */
-    private final CentralizedServiceWorker<I, ?, ?, M> service;
+    private final CentralizedServiceWorker<I, ?, ?> service;
     /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
-    /** Combiner for messages */
-    private final Combiner<I, M> combiner;
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
     /**
      * @param service Worker service
      * @param config  Hadoop configuration
      */
-    public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
-        ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    public Factory(CentralizedServiceWorker<I, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
       this.service = service;
       this.config = config;
-      combiner = config.createCombiner();
     }
 
     @Override
-    public MessageStoreByPartition<I, M> newStore() {
-      return new OneMessagePerVertexStore<I, M>(service, combiner, config);
+    public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+      return new OneMessagePerVertexStore<I, M>(messageClass, service,
+          config.<M>createCombiner(), config);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index 3fe4430..f0a8f6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.messages;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -63,10 +64,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SequentialFileMessageStore.class);
+  /** Message class */
+  private final Class<M> messageClass;
   /** File in which we store data */
   private final File file;
   /** Configuration which we need for reading data */
-  private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+  private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
   /** Buffer size to use when reading and writing files */
   private final int bufferSize;
   /** File input stream */
@@ -79,15 +82,18 @@ public class SequentialFileMessageStore<I extends WritableComparable,
   /**
    * Stores message on the disk.
    *
-   * @param config     Configuration used later for reading
-   * @param bufferSize Buffer size to use when reading and writing
-   * @param fileName   File in which we want to store messages
+   * @param messageClass Message class held in the store
+   * @param config       Configuration used later for reading
+   * @param bufferSize   Buffer size to use when reading and writing
+   * @param fileName     File in which we want to store messages
    * @throws IOException
    */
   public SequentialFileMessageStore(
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+      Class<M> messageClass,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       int bufferSize,
       String fileName) {
+    this.messageClass = messageClass;
     this.config = config;
     this.bufferSize = bufferSize;
     file = new File(fileName);
@@ -295,7 +301,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
     int messagesSize = in.readInt();
     List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
     for (int i = 0; i < messagesSize; i++) {
-      M message = config.createMessageValue();
+      M message = ReflectionUtils.newInstance(messageClass);
       try {
         message.readFields(in);
       } catch (IOException e) {
@@ -394,12 +400,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
     }
 
     @Override
-    public BasicMessageStore<I, M> newStore() {
+    public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
       int idx = Math.abs(storeCounter.getAndIncrement());
       String fileName =
           directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(config, bufferSize,
-          fileName);
+      return new SequentialFileMessageStore<I, M>(messageClass, config,
+          bufferSize, fileName);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index fc6560d..1a91dfb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -45,22 +45,27 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public abstract class SimpleMessageStore<I extends WritableComparable,
     M extends Writable, T> implements MessageStoreByPartition<I, M>  {
+  /** Message class */
+  protected final Class<M> messageClass;
   /** Service worker */
-  protected final CentralizedServiceWorker<I, ?, ?, M> service;
+  protected final CentralizedServiceWorker<I, ?, ?> service;
   /** Map from partition id to map from vertex id to messages for that vertex */
   protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
   /** Giraph configuration */
-  protected final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+  protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 
   /**
    * Constructor
    *
+   * @param messageClass Message class held in the store
    * @param service Service worker
    * @param config Giraph configuration
    */
   public SimpleMessageStore(
-      CentralizedServiceWorker<I, ?, ?, M> service,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      Class<M> messageClass,
+      CentralizedServiceWorker<I, ?, ?> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+    this.messageClass = messageClass;
     this.service = service;
     this.config = config;
     map = new MapMaker().concurrencyLevel(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 319f41a..c982209 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -39,7 +39,7 @@ public class NettyMasterClient implements MasterClient {
   /** Netty client that does the actual I/O */
   private final NettyClient nettyClient;
   /** Worker information for current superstep */
-  private CentralizedServiceMaster<?, ?, ?, ?> service;
+  private CentralizedServiceMaster<?, ?, ?> service;
   /** Cached map of partition ids to serialized aggregator data */
   private final SendAggregatorCache sendAggregatorCache =
       new SendAggregatorCache();
@@ -57,7 +57,7 @@ public class NettyMasterClient implements MasterClient {
    */
   public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
                            ImmutableClassesGiraphConfiguration configuration,
-                           CentralizedServiceMaster<?, ?, ?, ?> service) {
+                           CentralizedServiceMaster<?, ?, ?> service) {
     this.nettyClient =
         new NettyClient(context, configuration, service.getMasterInfo());
     this.service = service;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index 1f04bcf..cb36c3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -41,7 +41,7 @@ public class NettyMasterServer implements MasterServer {
    * @param progressable Progressable for reporting progress
    */
   public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceMaster<?, ?, ?, ?> service,
+      CentralizedServiceMaster<?, ?, ?> service,
       Progressable progressable) {
     nettyServer = new NettyServer(conf,
         new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index d1cce64..798e09c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -41,9 +41,9 @@ public class NettyWorkerAggregatorRequestProcessor
   /** Progressable used to report progress */
   private final Progressable progressable;
   /** NettyClient that could be shared among one or more instances */
-  private final WorkerClient<?, ?, ?, ?> workerClient;
+  private final WorkerClient<?, ?, ?> workerClient;
   /** Service worker */
-  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+  private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Cached map of partition ids to serialized aggregator data */
   private final SendAggregatedValueCache sendAggregatedValueCache =
       new SendAggregatedValueCache();
@@ -59,8 +59,8 @@ public class NettyWorkerAggregatorRequestProcessor
    */
   public NettyWorkerAggregatorRequestProcessor(
       Progressable progressable,
-      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> configuration,
-      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+      ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
+      CentralizedServiceWorker<?, ?, ?> serviceWorker) {
     this.serviceWorker = serviceWorker;
     this.workerClient = serviceWorker.getWorkerClient();
     this.progressable = progressable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 9c09524..28f3656 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -50,20 +50,19 @@ import java.util.Map;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class NettyWorkerClient<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    WorkerClient<I, V, E, M>, ResetSuperstepMetricsObserver {
+    V extends Writable, E extends Writable> implements
+    WorkerClient<I, V, E>, ResetSuperstepMetricsObserver {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
   /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Netty client that does that actual I/O */
   private final NettyClient nettyClient;
   /** Centralized service, needed to get vertex ranges */
-  private final CentralizedServiceWorker<I, V, E, M> service;
+  private final CentralizedServiceWorker<I, V, E> service;
 
   // Metrics
   /** Per-superstep, per-request counters */
@@ -78,8 +77,8 @@ public class NettyWorkerClient<I extends WritableComparable,
    */
   public NettyWorkerClient(
       Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      CentralizedServiceWorker<I, V, E, M> service) {
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      CentralizedServiceWorker<I, V, E> service) {
     this.nettyClient =
         new NettyClient(context, configuration, service.getWorkerInfo());
     this.conf = configuration;
@@ -111,7 +110,7 @@ public class NettyWorkerClient<I extends WritableComparable,
         metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
   }
 
-  public CentralizedServiceWorker<I, V, E, M> getService() {
+  public CentralizedServiceWorker<I, V, E> getService() {
     return service;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index db4ff5d..06965ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -71,26 +71,25 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("unchecked")
 public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    WorkerClientRequestProcessor<I, V, E, M> {
+    V extends Writable, E extends Writable> implements
+    WorkerClientRequestProcessor<I, V, E> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(NettyWorkerClientRequestProcessor.class);
   /** Cached partitions of vertices to send */
-  private final SendPartitionCache<I, V, E, M> sendPartitionCache;
+  private final SendPartitionCache<I, V, E> sendPartitionCache;
   /** Cached map of partitions to vertex indices to messages */
-  private final SendMessageCache<I, M> sendMessageCache;
+  private final SendMessageCache<I, Writable> sendMessageCache;
   /** Cache of edges to be sent. */
   private final SendEdgeCache<I, E> sendEdgeCache;
   /** Cached map of partitions to vertex indices to mutations */
-  private final SendMutationsCache<I, V, E, M> sendMutationsCache =
-      new SendMutationsCache<I, V, E, M>();
+  private final SendMutationsCache<I, V, E> sendMutationsCache =
+      new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
-  private final WorkerClient<I, V, E, M> workerClient;
+  private final WorkerClient<I, V, E> workerClient;
   /** Messages sent during the last superstep */
   private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
@@ -100,11 +99,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
   /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Service worker */
-  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
   /** Server data from the server (used for local requests) */
-  private final ServerData<I, V, E, M> serverData;
+  private final ServerData<I, V, E> serverData;
 
   // Per-Superstep Metrics
   /** Number of requests that went on the wire */
@@ -121,13 +120,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    */
   public NettyWorkerClientRequestProcessor(
       Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
     this.workerClient = serviceWorker.getWorkerClient();
     this.configuration = conf;
 
-    sendPartitionCache = new SendPartitionCache<I, V, E, M>(context, conf);
-    sendMessageCache = new SendMessageCache<I, M>(conf, serviceWorker);
+    sendPartitionCache = new SendPartitionCache<I, V, E>(context, conf);
+    sendMessageCache = new SendMessageCache<I, Writable>(conf, serviceWorker);
     sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
     maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
     maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
@@ -160,7 +159,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   }
 
   @Override
-  public boolean sendMessageRequest(I destVertexId, M message) {
+  public boolean sendMessageRequest(I destVertexId, Writable message) {
     PartitionOwner owner =
         serviceWorker.getVertexPartitionOwner(destVertexId);
     WorkerInfo workerInfo = owner.getWorkerInfo();
@@ -178,11 +177,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageSize >= maxMessagesSizePerWorker) {
-      PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+      PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>
           workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, M>(workerMessages);
+          new SendWorkerMessagesRequest<I, Writable>(workerMessages);
       doRequest(workerInfo, writableRequest);
       return true;
     }
@@ -192,14 +191,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
   @Override
   public void sendPartitionRequest(WorkerInfo workerInfo,
-                                   Partition<I, V, E, M> partition) {
+                                   Partition<I, V, E> partition) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
           ", with partition " + partition);
     }
 
-    WritableRequest vertexRequest =
-        new SendVertexRequest<I, V, E, M>(partition);
+    WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
     doRequest(workerInfo, vertexRequest);
 
     // Messages are stored separately
@@ -215,12 +213,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param partition Partition whose messages to send
    */
   private void sendPartitionMessages(WorkerInfo workerInfo,
-                                     Partition<I, V, E, M> partition) {
+                                     Partition<I, V, E> partition) {
     final int partitionId = partition.getId();
-    MessageStoreByPartition<I, M> messageStore =
+    MessageStoreByPartition<I, Writable> messageStore =
         serverData.getCurrentMessageStore();
-    ByteArrayVertexIdMessages<I, M> vertexIdMessages =
-        new ByteArrayVertexIdMessages<I, M>();
+    ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
+        new ByteArrayVertexIdMessages<I, Writable>(
+            configuration.getOutgoingMessageValueClass());
     vertexIdMessages.setConf(configuration);
     vertexIdMessages.initialize();
     for (I vertexId :
@@ -228,8 +227,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       try {
         // Messages cannot be re-used from this iterable, but add()
         // serializes the message, making this safe
-        Iterable<M> messages = messageStore.getVertexMessages(vertexId);
-        for (M message : messages) {
+        Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
+        for (Writable message : messages) {
           vertexIdMessages.add(vertexId, message);
         }
       } catch (IOException e) {
@@ -238,18 +237,19 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       }
       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
         WritableRequest messagesRequest = new
-            SendPartitionCurrentMessagesRequest<I, V, E, M>(
+            SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
             partitionId, vertexIdMessages);
         doRequest(workerInfo, messagesRequest);
         vertexIdMessages =
-            new ByteArrayVertexIdMessages<I, M>();
+            new ByteArrayVertexIdMessages<I, Writable>(
+                configuration.getOutgoingMessageValueClass());
         vertexIdMessages.setConf(configuration);
         vertexIdMessages.initialize();
       }
     }
     if (!vertexIdMessages.isEmpty()) {
       WritableRequest messagesRequest = new
-          SendPartitionCurrentMessagesRequest<I, V, E, M>(
+          SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
           partitionId, vertexIdMessages);
       doRequest(workerInfo, messagesRequest);
     }
@@ -257,8 +257,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
   @Override
   public void sendVertexRequest(PartitionOwner partitionOwner,
-                                Vertex<I, V, E, M> vertex) {
-    Partition<I, V, E, M> partition =
+                                Vertex<I, V, E> vertex) {
+    Partition<I, V, E> partition =
         sendPartitionCache.addVertex(partitionOwner, vertex);
     if (partition == null) {
       return;
@@ -329,10 +329,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       int partitionMutationCount) {
     // Send a request if enough mutations are there for a partition
     if (partitionMutationCount >= maxMutationsPerPartition) {
-      Map<I, VertexMutations<I, V, E, M>> partitionMutations =
+      Map<I, VertexMutations<I, V, E>> partitionMutations =
           sendMutationsCache.removePartitionMutations(partitionId);
       WritableRequest writableRequest =
-          new SendPartitionMutationsRequest<I, V, E, M>(
+          new SendPartitionMutationsRequest<I, V, E>(
               partitionId, partitionMutations);
       doRequest(partitionOwner.getWorkerInfo(), writableRequest);
     }
@@ -360,7 +360,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   }
 
   @Override
-  public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
+  public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
     PartitionOwner partitionOwner =
         serviceWorker.getVertexPartitionOwner(vertex.getId());
     int partitionId = partitionOwner.getPartitionId();
@@ -398,7 +398,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   @Override
   public void flush() throws IOException {
     // Execute the remaining send partitions (if any)
-    for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+    for (Map.Entry<PartitionOwner, Partition<I, V, E>> entry :
         sendPartitionCache.getOwnerPartitionMap().entrySet()) {
       sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
     }
@@ -406,15 +406,15 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
     // Execute the remaining sends messages (if any)
     PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessages<I, M>>>
+        ByteArrayVertexIdMessages<I, Writable>>>
         remainingMessageCache = sendMessageCache.removeAllMessages();
     PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+        PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>>.Iterator
         iterator = remainingMessageCache.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
       WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, M>(
+          new SendWorkerMessagesRequest<I, Writable>(
               iterator.getCurrentSecond());
       doRequest(iterator.getCurrentFirst(), writableRequest);
     }
@@ -435,12 +435,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     }
 
     // Execute the remaining sends mutations (if any)
-    Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
+    Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
         sendMutationsCache.removeAllPartitionMutations();
-    for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
+    for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
         remainingMutationsCache.entrySet()) {
       WritableRequest writableRequest =
-          new SendPartitionMutationsRequest<I, V, E, M>(
+          new SendPartitionMutationsRequest<I, V, E>(
               entry.getKey(), entry.getValue());
       PartitionOwner partitionOwner =
           serviceWorker.getVertexPartitionOwner(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index ed0861e..b457038 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -22,17 +22,15 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.comm.messages.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
 import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
@@ -60,23 +58,24 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class NettyWorkerServer<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements WorkerServer<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements WorkerServer<I, V, E> {
   /** Class logger */
   private static final Logger LOG =
     Logger.getLogger(NettyWorkerServer.class);
   /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Service worker */
-  private final CentralizedServiceWorker<I, V, E, M> service;
+  private final CentralizedServiceWorker<I, V, E> service;
   /** Netty server that does that actual I/O */
   private final NettyServer nettyServer;
   /** Server data storage */
-  private final ServerData<I, V, E, M> serverData;
+  private final ServerData<I, V, E> serverData;
+  /** Mapper context */
+  private final Mapper<?, ?, ?, ?>.Context context;
 
   /**
    * Constructor to start the server.
@@ -85,18 +84,19 @@ public class NettyWorkerServer<I extends WritableComparable,
    * @param service Service to get partition mappings
    * @param context Mapper context
    */
-  public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-      CentralizedServiceWorker<I, V, E, M> service,
+  public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      CentralizedServiceWorker<I, V, E> service,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
     this.service = service;
+    this.context = context;
 
     serverData =
-        new ServerData<I, V, E, M>(service, conf, createMessageStoreFactory(),
+        new ServerData<I, V, E>(service, conf, createMessageStoreFactory(),
             context);
 
     nettyServer = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
+        new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
         service.getWorkerInfo(), context);
     nettyServer.start();
   }
@@ -107,33 +107,20 @@ public class NettyWorkerServer<I extends WritableComparable,
    *
    * @return Message store factory
    */
-  private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+  private MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
   createMessageStoreFactory() {
     boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
     if (!useOutOfCoreMessaging) {
-      if (conf.useCombiner()) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("createMessageStoreFactory: " +
-              "Using OneMessagePerVertexStore since combiner enabled");
-        }
-        return OneMessagePerVertexStore.newFactory(service, conf);
-      } else {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("createMessageStoreFactory: " +
-              "Using ByteArrayMessagesPerVertexStore " +
-              "since there is no combiner");
-        }
-        return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
-      }
+      return new InMemoryMessageStoreFactory<I, Writable>(service, conf);
     } else {
       int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
       if (LOG.isInfoEnabled()) {
         LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
             "maxMessagesInMemory = " + maxMessagesInMemory);
       }
-      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
-          SequentialFileMessageStore.newFactory(conf);
-      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+      MessageStoreFactory<I, Writable, BasicMessageStore<I, Writable>>
+          fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
+      MessageStoreFactory<I, Writable, FlushableMessageStore<I, Writable>>
           partitionStoreFactory =
           DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
       return DiskBackedMessageStoreByPartition.newFactory(service,
@@ -147,21 +134,19 @@ public class NettyWorkerServer<I extends WritableComparable,
   }
 
   @Override
-  public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
+  public void prepareSuperstep() {
     serverData.prepareSuperstep();
-    resolveMutations(graphState);
+    resolveMutations();
   }
 
   /**
    * Resolve mutation requests.
-   *
-   * @param graphState Graph state
    */
-  private void resolveMutations(GraphState<I, V, E, M> graphState) {
+  private void resolveMutations() {
     Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
         service.getPartitionStore().getNumPartitions(), 100);
       // Add any mutated vertex indices to be resolved
-    for (Entry<I, VertexMutations<I, V, E, M>> e :
+    for (Entry<I, VertexMutations<I, V, E>> e :
         serverData.getVertexMutations().entrySet()) {
       I vertexId = e.getKey();
       Integer partitionId = service.getPartitionId(vertexId);
@@ -176,7 +161,7 @@ public class NettyWorkerServer<I extends WritableComparable,
       Iterable<I> destinations = serverData.getCurrentMessageStore().
           getPartitionDestinationVertices(partitionId);
       if (!Iterables.isEmpty(destinations)) {
-        Partition<I, V, E, M> partition =
+        Partition<I, V, E> partition =
             service.getPartitionStore().getPartition(partitionId);
         for (I vertexId : destinations) {
           if (partition.getVertex(vertexId) == null) {
@@ -191,18 +176,17 @@ public class NettyWorkerServer<I extends WritableComparable,
       }
     }
     // Resolve all graph mutations
-    VertexResolver<I, V, E, M> vertexResolver =
-        conf.createVertexResolver(graphState);
+    VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
     for (Entry<Integer, Collection<I>> e :
         resolveVertexIndices.asMap().entrySet()) {
-      Partition<I, V, E, M> partition =
+      Partition<I, V, E> partition =
           service.getPartitionStore().getPartition(e.getKey());
       for (I vertexIndex : e.getValue()) {
-        Vertex<I, V, E, M> originalVertex =
+        Vertex<I, V, E> originalVertex =
             partition.getVertex(vertexIndex);
 
-        VertexMutations<I, V, E, M> mutations = null;
-        VertexMutations<I, V, E, M> vertexMutations =
+        VertexMutations<I, V, E> mutations = null;
+        VertexMutations<I, V, E> vertexMutations =
             serverData.getVertexMutations().get(vertexIndex);
         if (vertexMutations != null) {
           synchronized (vertexMutations) {
@@ -210,11 +194,11 @@ public class NettyWorkerServer<I extends WritableComparable,
           }
           serverData.getVertexMutations().remove(vertexIndex);
         }
-        Vertex<I, V, E, M> vertex = vertexResolver.resolve(
+        Vertex<I, V, E> vertex = vertexResolver.resolve(
             vertexIndex, originalVertex, mutations,
             serverData.getCurrentMessageStore().
                 hasMessagesForVertex(vertexIndex));
-        graphState.getContext().progress();
+        context.progress();
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("resolveMutations: Resolved vertex index " +
@@ -240,7 +224,7 @@ public class NettyWorkerServer<I extends WritableComparable,
   }
 
   @Override
-  public ServerData<I, V, E, M> getServerData() {
+  public ServerData<I, V, E> getServerData() {
     return serverData;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index b4e7dda..f64c373 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class WorkerRequestServerHandler<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
-    RequestServerHandler<WorkerRequest<I, V, E, M>> {
+    RequestServerHandler<WorkerRequest<I, V, E>> {
   /** Data that can be accessed for handling requests */
-  private final ServerData<I, V, E, M> serverData;
+  private final ServerData<I, V, E> serverData;
 
   /**
    * Constructor with external server data
@@ -47,7 +47,7 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
    */
-  public WorkerRequestServerHandler(ServerData<I, V, E, M> serverData,
+  public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo) {
@@ -56,23 +56,23 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
   }
 
   @Override
-  public void processRequest(WorkerRequest<I, V, E, M> request) {
+  public void processRequest(WorkerRequest<I, V, E> request) {
     request.doRequest(serverData);
   }
 
   /** Factory for {@link WorkerRequestServerHandler} */
   public static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable> implements
+      V extends Writable, E extends Writable> implements
       RequestServerHandler.Factory {
     /** Data that can be accessed for handling requests */
-    private final ServerData<I, V, E, M> serverData;
+    private final ServerData<I, V, E> serverData;
 
     /**
      * Constructor
      *
      * @param serverData Data held by the server
      */
-    public Factory(ServerData<I, V, E, M> serverData) {
+    public Factory(ServerData<I, V, E> serverData) {
       this.serverData = serverData;
     }
 
@@ -81,8 +81,8 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
         WorkerRequestReservedMap workerRequestReservedMap,
         ImmutableClassesGiraphConfiguration conf,
         TaskInfo myTaskInfo) {
-      return new WorkerRequestServerHandler<I, V, E,
-          M>(serverData, workerRequestReservedMap, conf, myTaskInfo);
+      return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
+          workerRequestReservedMap, conf, myTaskInfo);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index 037f4a0..88641c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
   V extends Writable, E extends Writable, M extends Writable> extends
-  WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+  WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
   /** Destination partition for these vertices' messages*/
   private int partitionId;
   /** Map of destination vertex ID's to message lists */
@@ -67,7 +67,10 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
-    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>();
+    // At this moment the Computation class have already been replaced with
+    // the new one, and we deal with messages from previous superstep
+    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>(
+        getConf().getIncomingMessageValueClass());
     vertexIdMessageMap.setConf(getConf());
     vertexIdMessageMap.initialize();
     vertexIdMessageMap.readFields(input);
@@ -80,9 +83,9 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
   }
 
   @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
+  public void doRequest(ServerData<I, V, E> serverData) {
     try {
-      serverData.getCurrentMessageStore().addPartitionMessages(partitionId,
+      serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
           vertexIdMessageMap);
     } catch (IOException e) {
       throw new RuntimeException("doRequest: Got IOException ", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
index a96842d..de0d098 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
@@ -42,19 +42,18 @@ import java.util.concurrent.ConcurrentHashMap;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class SendPartitionMutationsRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+    V extends Writable, E extends Writable> extends
+    WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionMutationsRequest.class);
   /** Partition id */
   private int partitionId;
   /** Mutations sent for a partition */
-  private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
+  private Map<I, VertexMutations<I, V, E>> vertexIdMutations;
 
   /**
    * Constructor used for reflection only
@@ -69,7 +68,7 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
    */
   public SendPartitionMutationsRequest(
       int partitionId,
-      Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
+      Map<I, VertexMutations<I, V, E>> vertexIdMutations) {
     this.partitionId = partitionId;
     this.vertexIdMutations = vertexIdMutations;
   }
@@ -82,8 +81,8 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
     for (int i = 0; i < vertexIdMutationsSize; ++i) {
       I vertexId = getConf().createVertexId();
       vertexId.readFields(input);
-      VertexMutations<I, V, E, M> vertexMutations =
-          new VertexMutations<I, V, E, M>();
+      VertexMutations<I, V, E> vertexMutations =
+          new VertexMutations<I, V, E>();
       vertexMutations.setConf(getConf());
       vertexMutations.readFields(input);
       if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
@@ -97,7 +96,7 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
   public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionId);
     output.writeInt(vertexIdMutations.size());
-    for (Entry<I, VertexMutations<I, V, E, M>> entry :
+    for (Entry<I, VertexMutations<I, V, E>> entry :
         vertexIdMutations.entrySet()) {
       entry.getKey().write(output);
       entry.getValue().write(output);
@@ -110,15 +109,15 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
   }
 
   @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
-    ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
+  public void doRequest(ServerData<I, V, E> serverData) {
+    ConcurrentHashMap<I, VertexMutations<I, V, E>> vertexMutations =
       serverData.getVertexMutations();
     Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
         .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
     verticesInMutationHist.update(vertexMutations.size());
-    for (Entry<I, VertexMutations<I, V, E, M>> entry :
+    for (Entry<I, VertexMutations<I, V, E>> entry :
         vertexIdMutations.entrySet()) {
-      VertexMutations<I, V, E, M> mutations =
+      VertexMutations<I, V, E> mutations =
           vertexMutations.get(entry.getKey());
       if (mutations == null) {
         mutations = vertexMutations.putIfAbsent(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
index 1de3cbb..e0cb916 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
@@ -34,17 +34,16 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class SendVertexRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+    V extends Writable, E extends Writable> extends
+    WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendVertexRequest.class);
   /** Partition */
-  private Partition<I, V, E, M> partition;
+  private Partition<I, V, E> partition;
 
   /**
    * Constructor used for reflection only
@@ -56,7 +55,7 @@ public class SendVertexRequest<I extends WritableComparable,
    *
    * @param partition Partition to send the request to
    */
-  public SendVertexRequest(Partition<I, V, E, M> partition) {
+  public SendVertexRequest(Partition<I, V, E> partition) {
     this.partition = partition;
   }
 
@@ -77,7 +76,7 @@ public class SendVertexRequest<I extends WritableComparable,
   }
 
   @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
+  public void doRequest(ServerData<I, V, E> serverData) {
     serverData.getPartitionStore().addPartition(partition);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 04b633b..f6bf9bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -54,7 +54,8 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
 
   @Override
   public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
-    return new ByteArrayVertexIdMessages<I, M>();
+    return new ByteArrayVertexIdMessages<I, M>(
+        getConf().getOutgoingMessageValueClass());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
index 4d9382f..0ceb3eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
@@ -28,14 +28,13 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public interface WorkerRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Execute the request
    *
    * @param serverData Accessible data that can be mutated per the request
    */
-  void doRequest(ServerData<I, V, E, M> serverData);
+  void doRequest(ServerData<I, V, E> serverData);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index fad20b0..181e681 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -32,12 +32,10 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public abstract class WritableRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements Writable,
-    ImmutableClassesGiraphConfigurable<I, V, E, M> {
+    V extends Writable, E extends Writable> implements Writable,
+    ImmutableClassesGiraphConfigurable<I, V, E> {
   /**
    * Value to use when size of the request in serialized form is not known
    * or too expensive to calculate
@@ -45,7 +43,7 @@ public abstract class WritableRequest<I extends WritableComparable,
   public static final int UNKNOWN_SIZE = -1;
 
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Client id */
   private int clientId = -1;
   /** Request id */
@@ -103,13 +101,12 @@ public abstract class WritableRequest<I extends WritableComparable,
   abstract void writeRequest(DataOutput output) throws IOException;
 
   @Override
-  public final ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+  public final ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
     return conf;
   }
 
   @Override
-  public final void setConf(ImmutableClassesGiraphConfiguration<I, V,
-      E, M> conf) {
+  public final void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     this.conf = conf;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
index cceaaef..5d150d0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -24,7 +24,7 @@ import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
 
 /**
  * Tracks all of the Giraph options
@@ -75,7 +75,7 @@ public class AllOptions {
   public static void main(String[] args) {
     // This is necessary to trigger the static constants in GiraphConstants to
     // get loaded. Without it we get no output.
-    VERTEX_CLASS.toString();
+    COMPUTATION_CLASS.toString();
 
     LOG.info(allOptionsString());
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
index 77564ee..c5096fa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
@@ -27,22 +27,21 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public class DefaultImmutableClassesGiraphConfigurable<
     I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> implements
-    ImmutableClassesGiraphConfigurable<I, V, E, M> {
+    E extends Writable> implements
+    ImmutableClassesGiraphConfigurable<I, V, E> {
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
   @Override
-  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     this.conf = conf;
   }
 
   @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
     return conf;
   }
 }