You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2015/04/10 21:20:27 UTC

[1/3] git commit: updated refs/heads/trunk to 5d0b81a

Repository: giraph
Updated Branches:
  refs/heads/trunk 6ee97e77e -> 5d0b81ac4


http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
index 261e7c2..0841220 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
@@ -17,12 +17,16 @@
  */
 package org.apache.giraph.utils;
 
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.DefaultEdgeValueFactory;
-import org.apache.giraph.factories.DefaultIncomingMessageValueFactory;
-import org.apache.giraph.factories.DefaultOutgoingMessageValueFactory;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.factories.DefaultVertexIdFactory;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.EdgeValueFactory;
@@ -40,11 +44,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.junit.Test;
 
-import java.io.IOException;
-
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-import static org.junit.Assert.assertEquals;
-
 public class TestReflectionUtils {
   @Test
   public void testPackagePath() {
@@ -100,12 +99,7 @@ public class TestReflectionUtils {
     assertEquals(Writable.class, classes[0]);
 
     classes = getTypeArguments(MessageValueFactory.class,
-        DefaultIncomingMessageValueFactory.class);
-    assertEquals(1, classes.length);
-    assertEquals(Writable.class, classes[0]);
-
-    classes = getTypeArguments(MessageValueFactory.class,
-        DefaultOutgoingMessageValueFactory.class);
+        DefaultMessageValueFactory.class);
     assertEquals(1, classes.length);
     assertEquals(Writable.class, classes[0]);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 26459c0..b9fc508 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -152,7 +152,7 @@ public class
     NullWritable edgeValue =
         immutableClassesGiraphConfiguration.createEdgeValue();
     Writable messageValue =
-        immutableClassesGiraphConfiguration.getOutgoingMessageValueFactory()
+        immutableClassesGiraphConfiguration.createOutgoingMessageValueFactory()
             .newInstance();
     assertSame(vertexValue.getClass(), NullWritable.class);
     assertSame(vertexValue, edgeValue);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
index a481db3..41726a9 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.vertex;
 
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -36,8 +38,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
 
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
-
 
 public class TestComputationTypes {
 
@@ -63,7 +63,7 @@ public class TestComputationTypes {
     /**
      * Matches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMatchMessageCombiner
+    public static class GeneratedVertexMatchMessageCombiner
         extends
         MessageCombiner<LongWritable, FloatWritable> {
       @Override
@@ -74,14 +74,14 @@ public class TestComputationTypes {
 
       @Override
       public FloatWritable createInitialMessage() {
-        return null;
+        return new FloatWritable();
       }
     }
 
     /**
      * Mismatches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMismatchMessageCombiner
+    public static class GeneratedVertexMismatchMessageCombiner
         extends
         MessageCombiner<LongWritable, DoubleWritable> {
       @Override
@@ -92,24 +92,15 @@ public class TestComputationTypes {
 
       @Override
       public DoubleWritable createInitialMessage() {
-        return null;
+        return new DoubleWritable();
       }
     }
 
     /**
      * Mismatches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMismatchValueFactory implements
+    public static class GeneratedVertexMismatchValueFactory implements
         VertexValueFactory<DoubleWritable> {
-
-      @Override
-      public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
-      @Override
-      public Class<DoubleWritable> getValueClass() {
-        return DoubleWritable.class;
-      }
-
       @Override
       public DoubleWritable newInstance() {
         return new DoubleWritable();
@@ -181,7 +172,7 @@ public class TestComputationTypes {
       validator.validateConfiguration();
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = IllegalStateException.class)
     public void testMismatchingVertex() throws SecurityException,
       NoSuchMethodException, NoSuchFieldException {
       Configuration conf = getDefaultTestConf() ;
@@ -196,7 +187,7 @@ public class TestComputationTypes {
       validator.validateConfiguration();
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = IllegalStateException.class)
     public void testMismatchingCombiner() throws SecurityException,
       NoSuchMethodException, NoSuchFieldException {
       Configuration conf = getDefaultTestConf() ;
@@ -213,7 +204,7 @@ public class TestComputationTypes {
       validator.validateConfiguration();
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = IllegalStateException.class)
     public void testMismatchingVertexValueFactory() throws SecurityException,
         NoSuchMethodException, NoSuchFieldException {
       Configuration conf = getDefaultTestConf() ;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
index 517901a..ba3e8cc 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
@@ -17,6 +17,37 @@
  */
 package org.apache.giraph.hive.jython;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES;
+import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME;
+import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_SOURCE_ID_COLUMN;
+import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_TARGET_ID_COLUMN;
+import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_VALUE_COLUMN;
+import static org.apache.giraph.hive.jython.JythonVertexToHive.VERTEX_VALUE_COLUMN;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -35,12 +66,11 @@ import org.apache.giraph.hive.values.HiveValueReader;
 import org.apache.giraph.hive.values.HiveValueWriter;
 import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat;
 import org.apache.giraph.io.formats.multi.MultiVertexInputFormat;
+import org.apache.giraph.jython.JythonJob;
+import org.apache.giraph.jython.JythonUtils;
 import org.apache.giraph.jython.factories.JythonEdgeValueFactory;
 import org.apache.giraph.jython.factories.JythonFactoryBase;
-import org.apache.giraph.jython.factories.JythonIncomingMessageValueFactory;
-import org.apache.giraph.jython.JythonJob;
 import org.apache.giraph.jython.factories.JythonOutgoingMessageValueFactory;
-import org.apache.giraph.jython.JythonUtils;
 import org.apache.giraph.jython.factories.JythonVertexIdFactory;
 import org.apache.giraph.jython.factories.JythonVertexValueFactory;
 import org.apache.giraph.jython.wrappers.JythonWritableWrapper;
@@ -66,37 +96,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Closeables;
 
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES;
-import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME;
-import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME;
-import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_SOURCE_ID_COLUMN;
-import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_TARGET_ID_COLUMN;
-import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_VALUE_COLUMN;
-import static org.apache.giraph.hive.jython.JythonVertexToHive.VERTEX_VALUE_COLUMN;
-
 /**
  * Plugin to {@link HiveJythonRunner} to use Hive.
  */
@@ -377,10 +376,6 @@ public class HiveJythonUtils {
     types.setEdgeValueClass(initValueType(conf, GraphType.EDGE_VALUE,
         jythonJob.getEdge_value().getType(), new JythonEdgeValueFactory(),
         interpreter));
-    types.setIncomingMessageValueClass(
-        initValueType(conf, GraphType.INCOMING_MESSAGE_VALUE,
-            jythonJob.getIncoming_message_value().getType(),
-            new JythonIncomingMessageValueFactory(), interpreter));
     types.setOutgoingMessageValueClass(
         initValueType(conf, GraphType.OUTGOING_MESSAGE_VALUE,
             jythonJob.getOutgoing_message_value().getType(),
@@ -444,8 +439,6 @@ public class HiveJythonUtils {
    * @param jythonJob JythonJob
    */
   private static void checkMessageTypes(JythonJob jythonJob) {
-    checkMessageType(jythonJob.getIncoming_message_value(),
-        GraphType.INCOMING_MESSAGE_VALUE, jythonJob);
     checkMessageType(jythonJob.getOutgoing_message_value(),
         GraphType.OUTGOING_MESSAGE_VALUE, jythonJob);
   }


[2/3] git commit: updated refs/heads/trunk to 5d0b81a

Posted by ed...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
index 375d054..14f889d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBoolean.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBoolean {
   private boolean vertexValue;
   /** data for edge value */
   private boolean edgeValue;
-  /** data for incoming message */
-  private boolean incomingMessage;
   /** data for outgoing message */
   private boolean outgoingMessage;
 
@@ -60,7 +58,6 @@ public class PerGraphTypeBoolean {
     setVertexId(options.getVertexId(), conf);
     setVertexValue(options.getVertexValue(), conf);
     setEdgeValue(options.getEdgeValue(), conf);
-    setIncomingMessage(options.getIncomingMessage(), conf);
     setOutgoingMessage(options.getOutgoingMessage(), conf);
   }
 
@@ -95,16 +92,6 @@ public class PerGraphTypeBoolean {
   }
 
   /**
-   * Set the incoming message value data from the option
-   *
-   * @param option EnumConfOption option to use
-   * @param conf Configuration
-   */
-  public void setIncomingMessage(BooleanConfOption option, Configuration conf) {
-    incomingMessage = option.get(conf);
-  }
-
-  /**
    * Set the outgoing message value data from the option
    *
    * @param option EnumConfOption option to use
@@ -128,8 +115,6 @@ public class PerGraphTypeBoolean {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -142,10 +127,6 @@ public class PerGraphTypeBoolean {
     return edgeValue;
   }
 
-  public boolean getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public boolean getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
index adfa979..d622546 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeBooleanConfOption.java
@@ -30,8 +30,6 @@ public class PerGraphTypeBooleanConfOption {
   private final BooleanConfOption vertexValue;
   /** option for edge value */
   private final BooleanConfOption edgeValue;
-  /** option for incoming message */
-  private final BooleanConfOption incomingMessage;
   /** option for outgoing message */
   private final BooleanConfOption outgoingMessage;
 
@@ -50,8 +48,6 @@ public class PerGraphTypeBooleanConfOption {
         defaultValue, description);
     edgeValue = new BooleanConfOption(keyPrefix + ".edge.value",
         defaultValue, description);
-    incomingMessage = new BooleanConfOption(keyPrefix + ".incoming.message",
-        defaultValue, description);
     outgoingMessage = new BooleanConfOption(keyPrefix + ".outgoing.message",
         defaultValue, description);
   }
@@ -70,8 +66,6 @@ public class PerGraphTypeBooleanConfOption {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -95,10 +89,6 @@ public class PerGraphTypeBooleanConfOption {
     return edgeValue;
   }
 
-  public BooleanConfOption getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public BooleanConfOption getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
index 7003709..199b2f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnum.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
   private T vertexValue;
   /** data for edge value */
   private T edgeValue;
-  /** data for incoming message */
-  private T incomingMessage;
   /** data for outgoing message */
   private T outgoingMessage;
 
@@ -63,7 +61,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
     setVertexId(options.getVertexId(), conf);
     setVertexValue(options.getVertexValue(), conf);
     setEdgeValue(options.getEdgeValue(), conf);
-    setIncomingMessage(options.getIncomingMessage(), conf);
     setOutgoingMessage(options.getOutgoingMessage(), conf);
   }
 
@@ -98,16 +95,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
   }
 
   /**
-   * Set the incoming message value data from the option
-   *
-   * @param option EnumConfOption option to use
-   * @param conf Configuration
-   */
-  public void setIncomingMessage(EnumConfOption<T> option, Configuration conf) {
-    incomingMessage = option.get(conf);
-  }
-
-  /**
    * Set the outgoing message value data from the option
    *
    * @param option EnumConfOption option to use
@@ -131,8 +118,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -145,10 +130,6 @@ public class PerGraphTypeEnum<T extends Enum<T>> {
     return edgeValue;
   }
 
-  public T getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public T getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
index 8ae4576..c5041e2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/PerGraphTypeEnumConfOption.java
@@ -32,8 +32,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
   private final EnumConfOption<T> vertexValue;
   /** option for edge value */
   private final EnumConfOption<T> edgeValue;
-  /** option for incoming message */
-  private final EnumConfOption<T> incomingMessage;
   /** option for outgoing message */
   private final EnumConfOption<T> outgoingMessage;
 
@@ -53,8 +51,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
         defaultValue, description);
     edgeValue = EnumConfOption.create(keyPrefix + ".edge.value",
         klass, defaultValue, description);
-    incomingMessage = EnumConfOption.create(keyPrefix + ".incoming.message",
-        klass, defaultValue, description);
     outgoingMessage = EnumConfOption.create(keyPrefix + ".outgoing.message",
         klass, defaultValue, description);
   }
@@ -89,8 +85,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
       return vertexValue;
     case EDGE_VALUE:
       return edgeValue;
-    case INCOMING_MESSAGE_VALUE:
-      return incomingMessage;
     case OUTGOING_MESSAGE_VALUE:
       return outgoingMessage;
     default:
@@ -114,10 +108,6 @@ public class PerGraphTypeEnumConfOption<T extends Enum<T>> {
     return edgeValue;
   }
 
-  public EnumConfOption<T> getIncomingMessage() {
-    return incomingMessage;
-  }
-
   public EnumConfOption<T> getOutgoingMessage() {
     return outgoingMessage;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
deleted file mode 100644
index 5551439..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/AbstractMessageValueFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.base.Objects;
-
-/**
- * Factory class to create default message values.
- *
- * @param <M> Message Value
- */
-public abstract class AbstractMessageValueFactory<M extends Writable>
-    implements MessageValueFactory<M> {
-  /** Message value class */
-  private Class<M> messageValueClass;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Get the message value class from the configuration
-   *
-   * @param conf Configuration
-   * @return message value Class
-   */
-  protected abstract Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf);
-
-  @Override
-  public Class<M> getValueClass() {
-    return messageValueClass;
-  }
-
-  @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-    messageValueClass = extractMessageValueClass(conf);
-  }
-
-  @Override public M newInstance() {
-    return WritableUtils.createWritable(messageValueClass, conf);
-  }
-
-  @Override public String toString() {
-    return Objects.toStringHelper(this)
-        .add("messageValueClass", messageValueClass)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
index 998c06f..a838e0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultEdgeValueFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -29,21 +30,18 @@ import org.apache.hadoop.io.Writable;
  * @param <E> Edge Value
  */
 public class DefaultEdgeValueFactory<E extends Writable>
-    implements EdgeValueFactory<E> {
+    implements EdgeValueFactory<E>, GiraphConfigurationSettable {
   /** Cached edge value class. */
   private Class<E> edgeValueClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
-  @Override public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     edgeValueClass = conf.getEdgeValueClass();
   }
 
-  @Override public Class<E> getValueClass() {
-    return edgeValueClass;
-  }
-
   @Override public E newInstance() {
     return WritableUtils.createWritable(edgeValueClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
deleted file mode 100644
index bdf2966..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default incoming message values.
- *
- * @param <M> Incoming Message Value
- */
-public class DefaultIncomingMessageValueFactory<M extends Writable> extends
-    AbstractMessageValueFactory<M> {
-  @Override protected Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf) {
-    return conf.getIncomingMessageValueClass();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
new file mode 100644
index 0000000..b10d30c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultMessageValueFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.factories;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * Factory class to create default message values.
+ *
+ * @param <M> Message Value
+ */
+public class DefaultMessageValueFactory<M extends Writable>
+    implements MessageValueFactory<M> {
+  /** Message value class */
+  private final Class<M> messageValueClass;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor
+   * @param messageValueClass message value class
+   * @param conf configuration
+   */
+  public DefaultMessageValueFactory(Class<M> messageValueClass,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.messageValueClass = messageValueClass;
+    this.conf = conf;
+  }
+
+  @Override public M newInstance() {
+    return WritableUtils.createWritable(messageValueClass, conf);
+  }
+
+  @Override public String toString() {
+    return Objects.toStringHelper(this)
+        .add("messageValueClass", messageValueClass)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
deleted file mode 100644
index a42d5e2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultOutgoingMessageValueFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.factories;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory class to create default outgoing message values.
- *
- * @param <M> Outgoing Message Value
- */
-public class DefaultOutgoingMessageValueFactory<M extends Writable> extends
-    AbstractMessageValueFactory<M> {
-  @Override protected Class<M> extractMessageValueClass(
-      ImmutableClassesGiraphConfiguration conf) {
-    return conf.getOutgoingMessageValueClass();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
index 305548c..d36ae57 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexIdFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.WritableComparable;
@@ -28,24 +29,19 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex ID
  */
 public class DefaultVertexIdFactory<I extends WritableComparable>
-    implements VertexIdFactory<I> {
+    implements VertexIdFactory<I>, GiraphConfigurationSettable {
   /** Cached vertex value class. */
   private Class<I> vertexIdClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     vertexIdClass = conf.getVertexIdClass();
   }
 
   @Override
-  public Class<I> getValueClass() {
-    return vertexIdClass;
-  }
-
-  @Override
   public I newInstance() {
     return WritableUtils.createWritable(vertexIdClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
index 634f0d5..2cc124c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/DefaultVertexValueFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.factories;
 
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -29,24 +30,19 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value
  */
 public class DefaultVertexValueFactory<V extends Writable>
-    implements VertexValueFactory<V> {
+    implements VertexValueFactory<V>, GiraphConfigurationSettable {
   /** Cached vertex value class. */
   private Class<V> vertexValueClass;
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     vertexValueClass = conf.getVertexValueClass();
   }
 
   @Override
-  public Class<V> getValueClass() {
-    return vertexValueClass;
-  }
-
-  @Override
   public V newInstance() {
     return WritableUtils.createWritable(vertexValueClass, conf);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
index 806664b..e8c1c6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/TestMessageValueFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 
@@ -40,13 +39,6 @@ public class TestMessageValueFactory<M extends Writable>
     this.klass = klass;
   }
 
-  @Override public Class<M> getValueClass() {
-    return klass;
-  }
-
-  @Override public void initialize(
-      ImmutableClassesGiraphConfiguration conf) { }
-
   @Override public M newInstance() {
     return ReflectionUtils.newInstance(klass);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
index 733d5f2..00f35f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactories.java
@@ -17,17 +17,14 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_FACTORY_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_FACTORY_CLASS;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Holder for factories to create user types.
  *
@@ -47,12 +44,6 @@ public class ValueFactories<I extends WritableComparable,
   private final VertexValueFactory<V> vertexValueFactory;
   /** Edge value factory. */
   private final EdgeValueFactory<E> edgeValueFactory;
-  // Note that for messages we store the class not the factory itself, because
-  // the factory instance may change per-superstep if the graph types change.
-  /** Incoming message value factory class */
-  private final Class<? extends MessageValueFactory> inMsgFactoryClass;
-  /** Outgoing message value factory class */
-  private final Class<? extends MessageValueFactory> outMsgFactoryClass;
 
   /**
    * Constructor reading from Configuration
@@ -63,19 +54,6 @@ public class ValueFactories<I extends WritableComparable,
     vertexIdFactory = VERTEX_ID_FACTORY_CLASS.newInstance(conf);
     vertexValueFactory = VERTEX_VALUE_FACTORY_CLASS.newInstance(conf);
     edgeValueFactory = EDGE_VALUE_FACTORY_CLASS.newInstance(conf);
-    inMsgFactoryClass = INCOMING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
-    outMsgFactoryClass = OUTGOING_MESSAGE_VALUE_FACTORY_CLASS.get(conf);
-  }
-
-  /**
-   * Initialize all of the factories.
-   *
-   * @param conf ImmutableClassesGiraphConfiguration
-   */
-  public void initializeIVE(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
-    vertexIdFactory.initialize(conf);
-    vertexValueFactory.initialize(conf);
-    edgeValueFactory.initialize(conf);
   }
 
   public EdgeValueFactory<E> getEdgeValueFactory() {
@@ -89,12 +67,4 @@ public class ValueFactories<I extends WritableComparable,
   public VertexValueFactory<V> getVertexValueFactory() {
     return vertexValueFactory;
   }
-
-  public Class<? extends MessageValueFactory> getInMsgFactoryClass() {
-    return inMsgFactoryClass;
-  }
-
-  public Class<? extends MessageValueFactory> getOutMsgFactoryClass() {
-    return outMsgFactoryClass;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
index 5725061..de56929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/factories/ValueFactory.java
@@ -17,7 +17,8 @@
  */
 package org.apache.giraph.factories;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.Serializable;
+
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -25,30 +26,11 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <W> Writable type
  */
-public interface ValueFactory<W extends Writable> {
-  /**
-   * Initialize factory settings from the conf.
-   * This gets called on startup and also if there are changes to the message
-   * classes used. For example if the user's
-   * {@link org.apache.giraph.master.MasterCompute} changes the
-   * {@link org.apache.giraph.graph.Computation} and the next superstep has a
-   * different message value type.
-   *
-   * @param conf Configuration
-   */
-  void initialize(ImmutableClassesGiraphConfiguration conf);
-
+public interface ValueFactory<W extends Writable> extends Serializable {
   /**
    * Create a new value.
    *
    * @return new value.
    */
   W newInstance();
-
-  /**
-   * Get the java Class representing messages this factory creates
-   *
-   * @return Class<M>
-   */
-  Class<W> getValueClass();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 996159f..226087c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -17,6 +17,12 @@
  */
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStore;
@@ -47,12 +53,6 @@ import com.google.common.collect.Lists;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Histogram;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
 /**
  * Compute as many vertex partitions as possible.  Every thread will has its
  * own instance of WorkerClientRequestProcessor to send requests.  Note that
@@ -139,7 +139,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     // Thread initialization (for locality)
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
-            context, configuration, serviceWorker);
+            context, configuration, serviceWorker,
+            configuration.getOutgoingMessageEncodeAndStoreType().
+              useOneMessageToManyIdsEncoding());
     WorkerThreadGlobalCommUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
     WorkerContext workerContext = serviceWorker.getWorkerContext();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
index 4a0ac8f..b02159e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphType.java
@@ -90,27 +90,6 @@ public enum GraphType {
       return conf.getEdgeValueFactory();
     }
   },
-  /** Incoming message value */
-  INCOMING_MESSAGE_VALUE {
-    @Override
-    public ClassConfOption<? extends Writable> writableConfOption() {
-      return GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-    }
-    @Override
-    public ClassConfOption<? extends ValueFactory> factoryClassOption() {
-      return GiraphConstants.INCOMING_MESSAGE_VALUE_FACTORY_CLASS;
-    }
-    @Override
-    public <T extends Writable> Class<T> get(
-        ImmutableClassesGiraphConfiguration conf) {
-      return conf.getIncomingMessageValueClass();
-    }
-    @Override
-    public <T extends Writable> ValueFactory<T> factory(
-        ImmutableClassesGiraphConfiguration conf) {
-      return conf.getIncomingMessageValueFactory();
-    }
-  },
   /** Outgoing message value */
   OUTGOING_MESSAGE_VALUE {
     @Override
@@ -129,7 +108,7 @@ public enum GraphType {
     @Override
     public <T extends Writable> ValueFactory<T> factory(
         ImmutableClassesGiraphConfiguration conf) {
-      return conf.getOutgoingMessageValueFactory();
+      return conf.createOutgoingMessageValueFactory();
     }
   };
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 73a7aab..38f14d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.job;
 
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -25,8 +29,8 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.VertexInputFormat;
@@ -36,10 +40,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
 /**
  * GiraphConfigurationValidator attempts to verify the consistency of
  * user-chosen InputFormat, OutputFormat, and Vertex generic type
@@ -81,7 +81,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /**
    * The Configuration object for use in the validation test.
    */
-  private ImmutableClassesGiraphConfiguration conf;
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor to execute the validation test, throws
@@ -126,7 +126,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    * @return outgoing message value type
    */
   private Class<? extends Writable> outgoingMessageValueType() {
-    return conf.getGiraphTypes().getOutgoingMessageValueClass();
+    return conf.getOutgoingMessageValueClass();
   }
 
   /**
@@ -266,11 +266,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    * generic params match the job.
    */
   private void verifyMessageCombinerGenericTypes() {
-    Class<? extends MessageCombiner<I, M2>> messageCombinerClass =
-      conf.getMessageCombinerClass();
-    if (messageCombinerClass != null) {
+    MessageCombiner<I, M2> messageCombiner =
+      conf.createOutgoingMessageCombiner();
+    if (messageCombiner != null) {
       Class<?>[] classList =
-          getTypeArguments(MessageCombiner.class, messageCombinerClass);
+          getTypeArguments(MessageCombiner.class, messageCombiner.getClass());
       checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
           MessageCombiner.class, "vertex index");
       checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
@@ -356,7 +356,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     if (classList[index] == null) {
       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
     } else if (!classList[index].equals(classFromComputation)) {
-      throw new IllegalArgumentException(
+      throw new IllegalStateException(
           "checkClassTypes: " + typeName + " types not equal, " +
               "computation - " + classFromComputation +
               ", " + klass.getSimpleName() + " - " +
@@ -378,7 +378,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     if (classList[index] == null) {
       LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
     } else if (!classList[index].isAssignableFrom(classFromComputation)) {
-      throw new IllegalArgumentException(
+      throw new IllegalStateException(
           "checkClassTypes: " + typeName + " types not assignable, " +
               "computation - " + classFromComputation +
               ", " + klass.getSimpleName() + " - " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
index 3266d9b..1eff92b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonOptions.java
@@ -86,10 +86,6 @@ public class JythonOptions {
   public static final JythonGraphTypeOptions JYTHON_EDGE_VALUE =
       new JythonGraphTypeOptions(GraphType.EDGE_VALUE);
 
-  /** incoming message value options */
-  public static final JythonGraphTypeOptions JYTHON_IN_MSG_VALUE =
-      new JythonGraphTypeOptions(GraphType.INCOMING_MESSAGE_VALUE);
-
   /** outgonig message value options */
   public static final JythonGraphTypeOptions JYTHON_OUT_MSG_VALUE =
       new JythonGraphTypeOptions(GraphType.OUTGOING_MESSAGE_VALUE);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
index 0d3d34a..c0b286f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonFactoryBase.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.jython.factories;
 
 import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.factories.ValueFactory;
@@ -37,7 +38,7 @@ import org.python.core.PyObject;
  * @param <W> writable type
  */
 public abstract class JythonFactoryBase<W extends Writable>
-    implements ValueFactory<W> {
+    implements ValueFactory<W>, GiraphConfigurationSettable {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(JythonFactoryBase.class);
 
@@ -84,7 +85,7 @@ public abstract class JythonFactoryBase<W extends Writable>
   }
 
   @Override
-  public void initialize(
+  public void setConf(
       ImmutableClassesGiraphConfiguration conf) {
     jythonClassName = jythonClassNameOption().get(conf);
     useWrapper = conf.getValueNeedsWrappers().get(getGraphType());
@@ -105,14 +106,6 @@ public abstract class JythonFactoryBase<W extends Writable>
     }
   }
 
-  @Override public Class<W> getValueClass() {
-    if (useWrapper) {
-      return (Class<W>) JythonWritableWrapper.class;
-    } else {
-      return (Class<W>) writableValueClass();
-    }
-  }
-
   /**
    * Use this factory in the {@link org.apache.hadoop.conf.Configuration}
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
deleted file mode 100644
index e77e9c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonIncomingMessageValueFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.jython.factories;
-
-import org.apache.giraph.jython.JythonOptions;
-import org.apache.hadoop.io.Writable;
-
-/**
- * {@link MessageValueFactory} that creates incoming message values which are
- * Jython classes.
- *
- * @param <M> Incoming Message Value
- */
-public class JythonIncomingMessageValueFactory<M extends Writable>
-    extends JythonMessageValueFactory<M> {
-  @Override
-  public JythonOptions.JythonGraphTypeOptions getOptions() {
-    return JythonOptions.JYTHON_IN_MSG_VALUE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
index d2f8d9f..f798f51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/factories/JythonMessageValueFactory.java
@@ -32,9 +32,4 @@ public abstract class JythonMessageValueFactory<M extends Writable>
   public M newInstance() {
     return (M) newJythonClassInstance();
   }
-
-  @Override
-  public Class<M> getValueClass() {
-    return (Class<M>) writableValueClass();
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e942157..0b56a4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -50,8 +50,8 @@ import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterServer;
@@ -197,7 +197,7 @@ public class BspServiceMaster<I extends WritableComparable,
   /** Current checkpoint status */
   private CheckpointStatus checkpointStatus;
   /** Checks if checkpointing supported */
-  private CheckpointSupportedChecker checkpointSupportedChecker;
+  private final CheckpointSupportedChecker checkpointSupportedChecker;
 
   /**
    * Constructor for setting up the master.
@@ -803,7 +803,8 @@ public class BspServiceMaster<I extends WritableComparable,
     GlobalStats globalStats = new GlobalStats();
     globalStats.readFields(finalizedStream);
     updateCounters(globalStats);
-    SuperstepClasses superstepClasses = new SuperstepClasses();
+    SuperstepClasses superstepClasses =
+        SuperstepClasses.createToRead(getConfiguration());
     superstepClasses.readFields(finalizedStream);
     getConfiguration().updateSuperstepClasses(superstepClasses);
     int prefixFileCount = finalizedStream.readInt();
@@ -1576,7 +1577,7 @@ public class BspServiceMaster<I extends WritableComparable,
         GiraphStats.getInstance().getEdges().getValue(),
         getContext());
     SuperstepClasses superstepClasses =
-      new SuperstepClasses(getConfiguration());
+        SuperstepClasses.createAndExtractTypes(getConfiguration());
     masterCompute.setGraphState(graphState);
     masterCompute.setSuperstepClasses(superstepClasses);
     return superstepClasses;
@@ -1732,8 +1733,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // match) and if the computation is halted, no need to check any of
     // the types.
     if (!globalStats.getHaltComputation()) {
-      superstepClasses.verifyTypesMatch(
-          getConfiguration(), getSuperstep() != 0);
+      superstepClasses.verifyTypesMatch(getSuperstep() > 0);
     }
     getConfiguration().updateSuperstepClasses(superstepClasses);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 85496c2..50e3b36 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -22,10 +22,12 @@ import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
@@ -173,8 +175,10 @@ public abstract class MasterCompute
 
   /**
    * Set incoming message class to be used
+   *
    * @param incomingMessageClass incoming message class
    */
+  @Deprecated
   public final void setIncomingMessage(
       Class<? extends Writable> incomingMessageClass) {
     superstepClasses.setIncomingMessageClass(incomingMessageClass);
@@ -182,6 +186,7 @@ public abstract class MasterCompute
 
   /**
    * Set outgoing message class to be used
+   *
    * @param outgoingMessageClass outgoing message class
    */
   public final void setOutgoingMessage(
@@ -189,6 +194,17 @@ public abstract class MasterCompute
     superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
   }
 
+  /**
+   * Set outgoing message classes to be used
+   *
+   * @param outgoingMessageClasses outgoing message classes
+   */
+  public void setOutgoingMessageClasses(
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
+  }
+
   @Override
   public final <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 8145109..8653c96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -23,17 +23,21 @@ import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.reflect.Modifier;
 
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.DefaultMessageClasses;
+import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
+import org.python.google.common.base.Preconditions;
 
 /**
  * Holds Computation and MessageCombiner class.
@@ -41,115 +45,170 @@ import org.apache.log4j.Logger;
 public class SuperstepClasses implements Writable {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /** Computation class to be used in the following superstep */
   private Class<? extends Computation> computationClass;
-  /** MessageCombiner class to be used in the following superstep */
-  private Class<? extends MessageCombiner> messageCombinerClass;
-  /** Incoming message class to be used in the following superstep */
-  private Class<? extends Writable> incomingMessageClass;
-  /** Outgoing message class to be used in the following superstep */
-  private Class<? extends Writable> outgoingMessageClass;
+  /** Incoming message classes, immutable, only here for cheecking */
+  private MessageClasses<? extends WritableComparable, ? extends Writable>
+  incomingMessageClasses;
+  /** Outgoing message classes */
+  private MessageClasses<? extends WritableComparable, ? extends Writable>
+  outgoingMessageClasses;
 
   /**
-   * Default constructor
+   * Constructor
+   * @param conf Configuration
+   * @param computationClass computation class
+   * @param incomingMessageClasses incoming message classes
+   * @param outgoingMessageClasses outgoing message classes
    */
-  public SuperstepClasses() {
+  public SuperstepClasses(
+      ImmutableClassesGiraphConfiguration conf,
+      Class<? extends Computation> computationClass,
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        incomingMessageClasses,
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    this.conf = conf;
+    this.computationClass = computationClass;
+    this.incomingMessageClasses = incomingMessageClasses;
+    this.outgoingMessageClasses = outgoingMessageClasses;
   }
 
   /**
-   * Constructor
-   *
+   * Create empty superstep classes, readFields needs to be called afterwards
    * @param conf Configuration
+   * @return Superstep classes
    */
-  @SuppressWarnings("unchecked")
-  public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
-    this(conf.getComputationClass(), conf.getMessageCombinerClass());
+  public static SuperstepClasses createToRead(
+      ImmutableClassesGiraphConfiguration conf) {
+    return new SuperstepClasses(conf, null, null, null);
   }
 
   /**
-   * Constructor
-   *
-   * @param computationClass Computation class
-   * @param messageCombinerClass MessageCombiner class
+   * Create superstep classes by initiazling from current state
+   * in configuration
+   * @param conf Configuration
+   * @return Superstep classes
    */
-  public SuperstepClasses(Class<? extends Computation> computationClass,
-      Class<? extends MessageCombiner> messageCombinerClass) {
-    this.computationClass = computationClass;
-    this.messageCombinerClass =
-        messageCombinerClass;
+  public static SuperstepClasses createAndExtractTypes(
+      ImmutableClassesGiraphConfiguration conf) {
+    return new SuperstepClasses(
+        conf,
+        conf.getComputationClass(),
+        conf.getOutgoingMessageClasses(),
+        conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
   }
 
   public Class<? extends Computation> getComputationClass() {
     return computationClass;
   }
 
-  public Class<? extends MessageCombiner> getMessageCombinerClass() {
-    return messageCombinerClass;
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getOutgoingMessageClasses() {
+    return outgoingMessageClasses;
   }
 
   /**
-   * Get incoming message class, either set directly, or through Computation
-   * @return incoming message class
+   * Set's outgoing MessageClasses for next superstep.
+   * Should not be used together with
+   * setMessageCombinerClass/setOutgoingMessageClass methods.
+   *
+   * @param outgoingMessageClasses outgoing message classes
    */
-  public Class<? extends Writable> getIncomingMessageClass() {
-    if (incomingMessageClass != null) {
-      return incomingMessageClass;
-    }
-    if (computationClass == null) {
-      return null;
-    }
-    Class[] computationTypes = ReflectionUtils.getTypeArguments(
-        TypesHolder.class, computationClass);
-    return computationTypes[3];
+  public void setOutgoingMessageClasses(
+      MessageClasses<? extends WritableComparable, ? extends Writable>
+        outgoingMessageClasses) {
+    this.outgoingMessageClasses = outgoingMessageClasses;
   }
 
   /**
-   * Get outgoing message class, either set directly, or through Computation
-   * @return outgoing message class
+   * Set computation class
+   * @param computationClass computation class
    */
-  public Class<? extends Writable> getOutgoingMessageClass() {
-    if (outgoingMessageClass != null) {
-      return outgoingMessageClass;
-    }
-    if (computationClass == null) {
-      return null;
-    }
-    Class[] computationTypes = ReflectionUtils.getTypeArguments(
-        TypesHolder.class, computationClass);
-    return computationTypes[4];
-  }
-
   public void setComputationClass(
       Class<? extends Computation> computationClass) {
     this.computationClass = computationClass;
+
+    if (computationClass != null) {
+      Class[] computationTypes = ReflectionUtils.getTypeArguments(
+          TypesHolder.class, computationClass);
+      if (computationTypes[4] != null &&
+          outgoingMessageClasses instanceof DefaultMessageClasses) {
+        ((DefaultMessageClasses) outgoingMessageClasses)
+          .setIfNotModifiedMessageClass(computationTypes[4]);
+      }
+    }
   }
 
+  /**
+   * Set message combiner class.
+   * Should not be used together setOutgoingMessageClasses
+   * (throws exception if called with it),
+   * as it is unnecessary to do so.
+   *
+   * @param messageCombinerClass message combiner class
+   */
   public void setMessageCombinerClass(
       Class<? extends MessageCombiner> messageCombinerClass) {
-    this.messageCombinerClass = messageCombinerClass;
+    Preconditions.checkState(
+        outgoingMessageClasses instanceof DefaultMessageClasses);
+    ((DefaultMessageClasses) outgoingMessageClasses).
+        setMessageCombinerClass(messageCombinerClass);
   }
 
+  /**
+   * Set incoming message class
+   * @param incomingMessageClass incoming message class
+   */
+  @Deprecated
   public void setIncomingMessageClass(
       Class<? extends Writable> incomingMessageClass) {
-    this.incomingMessageClass = incomingMessageClass;
+    if (!incomingMessageClasses.getMessageClass().
+        equals(incomingMessageClass)) {
+      throw new IllegalArgumentException(
+          "Cannot change incoming message class from " +
+          incomingMessageClasses.getMessageClass() +
+          " previously, to " + incomingMessageClass);
+    }
   }
 
+  /**
+   * Set outgoing message class.
+   * Should not be used together setOutgoingMessageClasses
+   * (throws exception if called with it),
+   * as it is unnecessary to do so.
+   *
+   * @param outgoingMessageClass outgoing message class
+   */
   public void setOutgoingMessageClass(
       Class<? extends Writable> outgoingMessageClass) {
-    this.outgoingMessageClass = outgoingMessageClass;
+    Preconditions.checkState(
+        outgoingMessageClasses instanceof DefaultMessageClasses);
+    ((DefaultMessageClasses) outgoingMessageClasses).
+        setMessageClass(outgoingMessageClass);
+  }
+
+  /**
+   * Get message combiner class
+   * @return message combiner class
+   */
+  public Class<? extends MessageCombiner> getMessageCombinerClass() {
+    MessageCombiner combiner =
+        outgoingMessageClasses.createMessageCombiner(conf);
+    return combiner != null ? combiner.getClass() : null;
   }
 
   /**
    * Verify that types of current Computation and MessageCombiner are valid.
    * If types don't match an {@link IllegalStateException} will be thrown.
    *
-   * @param conf Configuration to verify this with
    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
    *                                   message types match
    */
-  public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf,
-                               boolean checkMatchingMesssageTypes) {
+  public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
     // In some cases, for example when using Jython, the Computation class may
     // not be set. This is because it is created by a ComputationFactory
     // dynamically and not known ahead of time. In this case there is nothing to
@@ -160,91 +219,55 @@ public class SuperstepClasses implements Writable {
 
     Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
         TypesHolder.class, computationClass);
-    verifyTypes(conf.getVertexIdClass(), computationTypes[0],
+    ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
         "Vertex id", computationClass);
-    verifyTypes(conf.getVertexValueClass(), computationTypes[1],
+    ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
         "Vertex value", computationClass);
-    verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
+    ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
         "Edge value", computationClass);
 
-    Class<?> incomingMessageType = getIncomingMessageClass();
-    Class<?> outgoingMessageType = getOutgoingMessageClass();
-
     if (checkMatchingMesssageTypes) {
-      verifyTypes(incomingMessageType, conf.getOutgoingMessageValueClass(),
-          "New incoming and previous outgoing message", computationClass);
-    }
-    if (outgoingMessageType.isInterface()) {
-      throw new IllegalStateException("verifyTypesMatch: " +
-          "Message type must be concrete class " + outgoingMessageType);
-    }
-    if (Modifier.isAbstract(outgoingMessageType.getModifiers())) {
-      throw new IllegalStateException("verifyTypesMatch: " +
-          "Message type can't be abstract class" + outgoingMessageType);
-    }
-    if (messageCombinerClass != null) {
-      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
-          MessageCombiner.class, messageCombinerClass);
-      verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
-          "Vertex id", messageCombinerClass);
-      verifyTypes(outgoingMessageType, combinerTypes[1],
-          "Outgoing message", messageCombinerClass);
+      ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
+          computationTypes[3], "Incoming message type", computationClass);
     }
+
+    ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
+        computationTypes[4], "Outgoing message type", computationClass);
+
+    outgoingMessageClasses.verifyConsistent(conf);
   }
 
   /**
-   * Verify that found type matches the expected type. If types don't match an
-   * {@link IllegalStateException} will be thrown.
-   *
-   * @param expected Expected type
-   * @param actual Actual type
-   * @param typeDesc String description of the type (for exception description)
-   * @param mainClass Class in which the actual type was found (for exception
-   *                  description)
+   * Update GiraphClasses with updated types
+   * @param classes Giraph classes
    */
-  private void verifyTypes(Class<?> expected, Class<?> actual,
-      String typeDesc, Class<?> mainClass) {
-    if (!expected.equals(actual)) {
-      if (actual.isAssignableFrom(expected)) {
-        LOG.warn("verifyTypes: proceeding with assignable types : " +
-          typeDesc + " types, in " + mainClass.getName() + " " + expected +
-          " expected, but " + actual + " found");
-      } else {
-        throw new IllegalStateException("verifyTypes: " + typeDesc +
-            " types " + "don't match, in " + mainClass.getName() + " " +
-            expected + " expected, but " + actual + " found");
-      }
-    }
+  public void updateGiraphClasses(GiraphClasses classes) {
+    classes.setComputationClass(computationClass);
+    classes.setIncomingMessageClasses(incomingMessageClasses);
+    classes.setOutgoingMessageClasses(outgoingMessageClasses);
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
     WritableUtils.writeClass(computationClass, output);
-    WritableUtils.writeClass(messageCombinerClass, output);
-    WritableUtils.writeClass(incomingMessageClass, output);
-    WritableUtils.writeClass(outgoingMessageClass, output);
+    WritableUtils.writeWritableObject(incomingMessageClasses, output);
+    WritableUtils.writeWritableObject(outgoingMessageClasses, output);
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     computationClass = WritableUtils.readClass(input);
-    messageCombinerClass = WritableUtils.readClass(input);
-    incomingMessageClass = WritableUtils.readClass(input);
-    outgoingMessageClass = WritableUtils.readClass(input);
+    incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
+    outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
   }
 
   @Override
   public String toString() {
     String computationName = computationClass == null ? "_not_set_" :
         computationClass.getName();
-    String combinerName = (messageCombinerClass == null) ? "null" :
-        messageCombinerClass.getName();
-    String incomingName = (incomingMessageClass == null) ? "null" :
-      incomingMessageClass.getName();
-    String outgoingName = (outgoingMessageClass == null) ? "null" :
-      outgoingMessageClass.getName();
-
-    return "(computation=" + computationName + ",combiner=" + combinerName +
-        ",incoming=" + incomingName + ",outgoing=" + outgoingName + ")";
+    return "(computation=" + computationName +
+        ",incoming=" + incomingMessageClasses +
+        ",outgoing=" + outgoingMessageClasses + ")";
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
new file mode 100644
index 0000000..0b9daf7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/NoMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Type marking that in a particular superstep there will not be
+ * sent messages.
+ * We cannot use NullWritable for this, as you could send NullWritable,
+ * to send a signal (whether a vertex receives a message or not)
+ */
+public class NoMessage implements Writable {
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new IllegalStateException("NoMessage should never be read");
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new IllegalStateException("NoMessage should never be written");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index daad860..1544984 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Stores vertex id and message pairs in a single byte array.
  *
@@ -37,7 +37,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
   M extends Writable> extends ByteArrayVertexIdData<I, M>
   implements VertexIdMessages<I, M> {
   /** Message value class */
-  private MessageValueFactory<M> messageValueFactory;
+  private final MessageValueFactory<M> messageValueFactory;
   /** Add the message size to the stream? (Depends on the message store) */
   private boolean useMessageSizeEncoding = false;
 
@@ -57,7 +57,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
    * de-serialized right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
-    if (!getConf().useMessageCombiner()) {
+    if (!getConf().useOutgoingMessageCombiner()) {
       useMessageSizeEncoding = getConf().useMessageSizeEncoding();
     } else {
       useMessageSizeEncoding = false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index f5f0fb9..028f9e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.utils;
 
+import java.lang.reflect.Modifier;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.jodah.typetools.TypeResolver;
 
@@ -111,4 +113,48 @@ public class ReflectionUtils {
     ConfigurationUtils.configureIfPossible(result, configuration);
     return result;
   }
+
+  /**
+   * Verify that found type matches the expected type. If types don't match an
+   * {@link IllegalStateException} will be thrown.
+   *
+   * @param concreteChild Concrete child type
+   * @param parent Parent type
+   * @param typeDesc String description of the type (for exception description)
+   * @param mainClass Class in which the actual type was found (for exception
+   *                  description)
+   */
+  public static void verifyTypes(Class<?> concreteChild, Class<?> parent,
+      String typeDesc, Class<?> mainClass) {
+    // unknown means object
+    if (parent == TypeResolver.Unknown.class) {
+      parent = Object.class;
+    }
+
+    verifyConcrete(concreteChild, typeDesc);
+
+    if (!parent.isAssignableFrom(concreteChild)) {
+      throw new IllegalStateException("verifyTypes: " + typeDesc + " types " +
+          "don't match, in " + mainClass.getName() + " " + concreteChild +
+          " expected, but " + parent + " found");
+    }
+  }
+
+  /**
+   * Verify that given type is a concrete type that can be instantiated.
+   *
+   * @param concrete type to check
+   * @param typeDesc String description of the type (for exception description)
+   */
+  public static void verifyConcrete(
+      Class<?> concrete, String typeDesc) {
+    if (concrete.isInterface()) {
+      throw new IllegalStateException("verifyTypes: " +
+          "Type " + typeDesc + " must be concrete class " + concrete);
+    }
+    if (Modifier.isAbstract(concrete.getModifiers())) {
+      throw new IllegalStateException("verifyTypes: " +
+          "Type " + typeDesc + "can't be abstract class" + concrete);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f614a33..2f1c2ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -931,7 +931,8 @@ public class BspServiceWorker<I extends WritableComparable,
     waitForOtherWorkers(superstepFinishedNode);
 
     GlobalStats globalStats = new GlobalStats();
-    SuperstepClasses superstepClasses = new SuperstepClasses();
+    SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+        getConfiguration());
     WritableUtils.readFieldsFromZnode(
         getZkExt(), superstepFinishedNode, false, null, globalStats,
         superstepClasses);
@@ -1659,7 +1660,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
       // Load global stats and superstep classes
       GlobalStats globalStats = new GlobalStats();
-      SuperstepClasses superstepClasses = new SuperstepClasses();
+      SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
+          getConfiguration());
       String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
           CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
       DataInputStream finalizedStream =
@@ -1717,7 +1719,8 @@ else[HADOOP_NON_SECURE]*/
     Collections.shuffle(randomEntryList);
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
-            getConfiguration(), this);
+            getConfiguration(), this,
+            false /* useOneMessageToManyIdsEncoding */);
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
       randomEntryList) {
       for (Integer partitionId : workerPartitionList.getValue()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 10b1a25..7b2fc0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -18,6 +18,11 @@
 
 package org.apache.giraph.worker;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -43,11 +48,6 @@ import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.util.PercentGauge;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
 /**
  * Abstract base class for loading vertex/edge input splits.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -103,7 +103,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     this.context = context;
     this.workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
-            context, configuration, bspServiceWorker);
+            context, configuration, bspServiceWorker,
+            false /* useOneMessageToManyIdsEncoding, not useful for input */);
     this.useLocality = configuration.useInputSplitLocality();
     this.splitsHandler = splitsHandler;
     this.configuration = configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index eb2497b..bf20580 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -18,21 +18,38 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
 import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
+import org.apache.giraph.conf.DefaultMessageClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.factories.TestMessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.CollectionUtils;
@@ -46,23 +63,10 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
 
 /** Test for different types of message stores */
 public class TestMessageStores {
@@ -220,7 +224,12 @@ public class TestMessageStores {
     }
     out.close();
 
-    messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+    messageStore = messageStoreFactory.newStore(
+        new DefaultMessageClasses(
+            IntWritable.class,
+            DefaultMessageValueFactory.class,
+            null,
+            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(
         (new FileInputStream(file))));
@@ -240,7 +249,12 @@ public class TestMessageStores {
       TestData testData) throws IOException {
     SortedMap<IntWritable, Collection<IntWritable>> messages =
         new TreeMap<IntWritable, Collection<IntWritable>>();
-    S messageStore = messageStoreFactory.newStore(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+    S messageStore = messageStoreFactory.newStore(
+        new DefaultMessageClasses(
+            IntWritable.class,
+            DefaultMessageValueFactory.class,
+            null,
+            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION));
     putNTimes(messageStore, messages, testData);
     assertTrue(equalMessages(messageStore, messages, testData));
     messageStore.clearAll();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
index 4e38bff..57529f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
@@ -18,6 +18,11 @@
 
 package org.apache.giraph.conf;
 
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -31,12 +36,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import static org.junit.Assert.assertEquals;
-
 /**
  * Benchmark tests to insure that object creation via
  * {@link ImmutableClassesGiraphConfiguration} is fast
@@ -49,7 +48,7 @@ public class TestObjectCreation {
   private long startNanos = -1;
   private long totalNanos = -1;
   private long total = 0;
-  private long expected = COUNT * (COUNT - 1) / 2L;
+  private final long expected = COUNT * (COUNT - 1) / 2L;
   private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
       LongWritable> configuration;
 
@@ -59,7 +58,6 @@ public class TestObjectCreation {
     GiraphConstants.VERTEX_ID_CLASS.set(conf, IntWritable.class);
     GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
     GiraphConstants.EDGE_VALUE_CLASS.set(conf, DoubleWritable.class);
-    GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
     GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
     conf.setComputationClass(LongNoOpComputation.class);
     configuration =

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
index 721a74c..272666c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
@@ -18,10 +18,15 @@
 
 package org.apache.giraph.io;
 
-import com.google.common.collect.Maps;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.giraph.BspCase;
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.factories.VertexValueFactory;
@@ -39,10 +44,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.*;
+import com.google.common.collect.Maps;
 
 /**
  * A test case to ensure that loading a graph from vertices and edges works as
@@ -349,14 +351,6 @@ public class TestVertexEdgeInput extends BspCase {
   public static class TestVertexValueFactory
       implements VertexValueFactory<IntWritable> {
     @Override
-    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
-    @Override
-    public Class<IntWritable> getValueClass() {
-      return IntWritable.class;
-    }
-
-    @Override
     public IntWritable newInstance() {
       return new IntWritable(3);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index 4a8caaa..194bb5e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.master;
 
+import java.io.IOException;
+
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -32,74 +34,77 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.junit.Test;
 
-import java.io.IOException;
-
 /** Test type verification when switching computation and combiner types */
 public class TestComputationCombinerTypes {
+  private void testConsecutiveComp(
+      Class<? extends Computation> firstComputationClass,
+      Class<? extends Computation> secondComputationClass) {
+    testConsecutiveComp(firstComputationClass, secondComputationClass, null);
+  }
+
+  private void testConsecutiveComp(
+      Class<? extends Computation> firstComputationClass,
+      Class<? extends Computation> secondComputationClass,
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    ImmutableClassesGiraphConfiguration conf =
+        createConfiguration(firstComputationClass);
+    SuperstepClasses classes = SuperstepClasses.createAndExtractTypes(conf);
+    classes.setComputationClass(secondComputationClass);
+    classes.setMessageCombinerClass(messageCombinerClass);
+    classes.verifyTypesMatch(true);
+  }
+
   @Test
   public void testAllMatchWithoutCombiner() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntNoOpComputation.class, null);
-    classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class), true);
+    testConsecutiveComp(IntNoOpComputation.class, IntNoOpComputation.class);
   }
 
   @Test
   public void testAllMatchWithCombiner() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntDoubleMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        IntDoubleMessageCombiner.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentIdTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(LongIntIntLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, LongIntIntLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentVertexValueTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntLongIntLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, IntLongIntLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentEdgeDataTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntLongLongIntComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class, IntIntLongLongIntComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentMessageTypes() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntIntLongComputation.class, null);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntLongDoubleComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntLongDoubleComputation.class, IntIntIntIntLongComputation.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentCombinerIdType() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            DoubleDoubleMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        DoubleDoubleMessageCombiner.class);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDifferentCombinerMessageType() {
-    SuperstepClasses classes =
-        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntLongMessageCombiner.class);
-    classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class), true);
+    testConsecutiveComp(
+        IntIntIntIntLongComputation.class,
+        IntIntIntLongDoubleComputation.class,
+        IntLongMessageCombiner.class);
   }
 
   private static ImmutableClassesGiraphConfiguration createConfiguration(

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 833061e..8a034c2 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -140,9 +140,6 @@ public class TestSwitchClasses {
           break;
         case 4:
           setComputation(Computation1.class);
-          // message types removed
-          setIncomingMessage(null);
-          setOutgoingMessage(null);
           break;
         default:
           haltComputation();


[3/3] git commit: updated refs/heads/trunk to 5d0b81a

Posted by ed...@apache.org.
[GIRAPH-1002] Improve message changing through iters

Summary:
Add MessageClasses object to hold all information about single
message class (factory, combiner, store type)

Made it so that it can be changed as a whole, allowing
any complete control over messages.

(i.e. message factory couldn't be used when messages where changing)

Incoming message type configs are completely useless. It needs to be the same
as outgoing message type in the previous step. So deleting that completely.

*breaking change* Removed initialize and getValueClass from value factories.
Initialize can be achieved, as with everywhere else, by implementing
GiraphConfigurationSettable. getValueClass was not used anywhere.

This way - value factory has only one function - and lambda can be passed to it.

Test Plan: mvn clean install -Phadoop_facebook

Reviewers: sergey.edunov, dionysis.logothetis, avery.ching, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D36849


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5d0b81ac
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5d0b81ac
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5d0b81ac

Branch: refs/heads/trunk
Commit: 5d0b81ac42ab0de80bb68b87c7d1c1dd710f1108
Parents: 6ee97e7
Author: Igor Kabiljo <ik...@fb.com>
Authored: Fri Apr 10 11:42:28 2015 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Fri Apr 10 12:18:55 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 findbugs-exclude.xml                            |   5 +
 .../apache/giraph/comm/SendMessageCache.java    |  12 +-
 .../giraph/comm/SendOneMessageToManyCache.java  |  10 +-
 .../java/org/apache/giraph/comm/ServerData.java |   4 +-
 .../ByteArrayMessagesPerVertexStore.java        |  10 +-
 .../messages/InMemoryMessageStoreFactory.java   |  47 ++--
 .../comm/messages/MessageStoreFactory.java      |  10 +-
 .../comm/messages/OneMessagePerVertexStore.java |  12 +-
 .../out_of_core/DiskBackedMessageStore.java     |  27 +-
 .../DiskBackedMessageStoreFactory.java          |   6 +-
 .../PartitionDiskBackedMessageStore.java        |  18 +-
 .../out_of_core/SequentialFileMessageStore.java |   6 +-
 .../primitives/IdOneMessagePerVertexStore.java  |   4 +-
 .../NettyWorkerClientRequestProcessor.java      |  27 +-
 .../SendPartitionCurrentMessagesRequest.java    |   5 +-
 .../requests/SendWorkerMessagesRequest.java     |   8 +-
 .../SendWorkerOneMessageToManyRequest.java      |   6 +-
 .../giraph/conf/DefaultMessageClasses.java      | 203 +++++++++++++++
 .../org/apache/giraph/conf/GiraphClasses.java   | 135 ++++------
 .../apache/giraph/conf/GiraphConfiguration.java |  37 +--
 .../org/apache/giraph/conf/GiraphConstants.java |  21 +-
 .../org/apache/giraph/conf/GiraphTypes.java     |  46 +---
 .../ImmutableClassesGiraphConfiguration.java    | 148 ++++++-----
 .../org/apache/giraph/conf/MessageClasses.java  |  85 ++++++
 .../apache/giraph/conf/PerGraphTypeBoolean.java |  19 --
 .../conf/PerGraphTypeBooleanConfOption.java     |  10 -
 .../apache/giraph/conf/PerGraphTypeEnum.java    |  19 --
 .../giraph/conf/PerGraphTypeEnumConfOption.java |  10 -
 .../factories/AbstractMessageValueFactory.java  |  67 -----
 .../factories/DefaultEdgeValueFactory.java      |  10 +-
 .../DefaultIncomingMessageValueFactory.java     |  34 ---
 .../factories/DefaultMessageValueFactory.java   |  58 +++++
 .../DefaultOutgoingMessageValueFactory.java     |  34 ---
 .../factories/DefaultVertexIdFactory.java       |  10 +-
 .../factories/DefaultVertexValueFactory.java    |  10 +-
 .../factories/TestMessageValueFactory.java      |   8 -
 .../apache/giraph/factories/ValueFactories.java |  38 +--
 .../apache/giraph/factories/ValueFactory.java   |  24 +-
 .../apache/giraph/graph/ComputeCallable.java    |  16 +-
 .../java/org/apache/giraph/graph/GraphType.java |  23 +-
 .../job/GiraphConfigurationValidator.java       |  26 +-
 .../org/apache/giraph/jython/JythonOptions.java |   4 -
 .../jython/factories/JythonFactoryBase.java     |  13 +-
 .../JythonIncomingMessageValueFactory.java      |  35 ---
 .../factories/JythonMessageValueFactory.java    |   5 -
 .../apache/giraph/master/BspServiceMaster.java  |  12 +-
 .../org/apache/giraph/master/MasterCompute.java |  16 ++
 .../apache/giraph/master/SuperstepClasses.java  | 257 ++++++++++---------
 .../java/org/apache/giraph/types/NoMessage.java |  42 +++
 .../giraph/utils/ByteArrayVertexIdMessages.java |  12 +-
 .../apache/giraph/utils/ReflectionUtils.java    |  46 ++++
 .../apache/giraph/worker/BspServiceWorker.java  |   9 +-
 .../giraph/worker/InputSplitsCallable.java      |  13 +-
 .../apache/giraph/comm/TestMessageStores.java   |  62 +++--
 .../apache/giraph/conf/TestObjectCreation.java  |  14 +-
 .../apache/giraph/io/TestVertexEdgeInput.java   |  22 +-
 .../master/TestComputationCombinerTypes.java    |  77 +++---
 .../apache/giraph/master/TestSwitchClasses.java |   3 -
 .../giraph/utils/TestReflectionUtils.java       |  20 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   2 +-
 .../giraph/vertex/TestComputationTypes.java     |  29 +--
 .../giraph/hive/jython/HiveJythonUtils.java     |  73 +++---
 63 files changed, 1075 insertions(+), 1001 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 61cb3ce..30704e8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-1002: Improve message changing through iters (ikabiljo via edunov)
+
   GIRAPH-998: Close writers in parallel (majaakbiljo)
 
   GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 9ac4412..0c2ab96 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -96,4 +96,9 @@
     <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$GetPartition"/>
     <Bug pattern="UL_UNRELEASED_LOCK"/>
   </Match>
+  <Match>
+    <!-- Java Serialization is not used, so this is never an actual issue.
+      On the other hand, Kryo needs lambdas to be Serializable to work. -->
+    <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index b1fec01..e101b01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -18,6 +18,9 @@
 
 package org.apache.giraph.comm;
 
+import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
+import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
+
 import java.util.Iterator;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -28,17 +31,14 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
-
 /**
  * Aggregates the messages to be sent to workers so they can be sent
  * in bulk.  Not thread-safe.
@@ -81,7 +81,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
   @Override
   public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+        getConf().<M>createOutgoingMessageValueFactory());
   }
 
   /**
@@ -177,7 +177,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    */
   private class TargetVertexIdIterator implements Iterator<I> {
     /** An edge iterator */
-    private Iterator<Edge<I, Writable>> edgesIterator;
+    private final Iterator<Edge<I, Writable>> edgesIterator;
 
     /**
      * Constructor.

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
index c67a20b..c00e560 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
@@ -22,24 +22,24 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 /**
  * Aggregates the messages to be sent to workers so they can be sent
  * in bulk.
@@ -145,7 +145,7 @@ public class SendOneMessageToManyCache<I extends WritableComparable,
       msgVidsCache[workerInfo.getTaskId()];
     if (workerData == null) {
       workerData = new ByteArrayOneMessageToManyIds<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+          getConf().<M>createOutgoingMessageValueFactory());
       workerData.setConf(getConf());
       workerData.initialize(getSendWorkerInitialBufferSize(
         workerInfo.getTaskId()));

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 1fd85e4..129df59 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -191,9 +191,9 @@ public class ServerData<I extends WritableComparable,
     }
     currentMessageStore =
         incomingMessageStore != null ? incomingMessageStore :
-            messageStoreFactory.newStore(conf.getIncomingMessageValueFactory());
+            messageStoreFactory.newStore(conf.getIncomingMessageClasses());
     incomingMessageStore =
-        messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory());
+        messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
     // finalize current message-store before resolving mutations
     currentMessageStore.finalizeStore();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 57d255f..dfb5683 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -25,13 +25,14 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.RepresentativeByteStructIterator;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.VertexIdIterator;
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.RepresentativeByteStructIterator;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -209,8 +210,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
     @Override
     public MessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new ByteArrayMessagesPerVertexStore<I, M>(messageValueFactory,
+        MessageClasses<I, M> messageClasses) {
+      return new ByteArrayMessagesPerVertexStore<I, M>(
+          messageClasses.createMessageValueFactory(config),
           service, config);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index ae86c56..27980a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -30,6 +30,7 @@ import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessage
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 import org.apache.giraph.types.ops.TypeOpsUtils;
@@ -72,37 +73,38 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   /**
    * MessageStore to be used when combiner is enabled
    *
+   * @param messageClass message class
    * @param messageValueFactory message value factory
+   * @param messageCombiner message combiner
    * @return message store
    */
   protected MessageStore<I, M> newStoreWithCombiner(
-    MessageValueFactory<M> messageValueFactory) {
-    Class<M> messageClass = messageValueFactory.getValueClass();
+      Class<M> messageClass,
+      MessageValueFactory<M> messageValueFactory,
+      MessageCombiner<? super I, M> messageCombiner) {
     MessageStore messageStore;
     Class<I> vertexIdClass = conf.getVertexIdClass();
     if (vertexIdClass.equals(IntWritable.class) &&
         messageClass.equals(FloatWritable.class)) {
       messageStore = new IntFloatMessageStore(
           (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
-          (MessageCombiner<IntWritable, FloatWritable>)
-              conf.<FloatWritable>createMessageCombiner());
+          (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
     } else if (vertexIdClass.equals(LongWritable.class) &&
         messageClass.equals(DoubleWritable.class)) {
       messageStore = new LongDoubleMessageStore(
           (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
-          (MessageCombiner<LongWritable, DoubleWritable>)
-              conf.<DoubleWritable>createMessageCombiner());
+          (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
     } else {
       PrimitiveIdTypeOps<I> idTypeOps =
           TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
       if (idTypeOps != null) {
         messageStore = new IdOneMessagePerVertexStore<>(
-            messageValueFactory, service, conf.<M>createMessageCombiner(),
+            messageValueFactory, service, messageCombiner,
             conf);
       } else {
         messageStore =
             new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-                conf.<M>createMessageCombiner(), conf);
+                messageCombiner, conf);
       }
     }
     return messageStore;
@@ -111,14 +113,16 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
   /**
    * MessageStore to be used when combiner is not enabled
    *
+   * @param messageClass message class
    * @param messageValueFactory message value factory
+   * @param encodeAndStore message encode and store type
    * @return message store
    */
   protected MessageStore<I, M> newStoreWithoutCombiner(
-    MessageValueFactory<M> messageValueFactory) {
+      Class<M> messageClass,
+      MessageValueFactory<M> messageValueFactory,
+      MessageEncodeAndStoreType encodeAndStore) {
     MessageStore messageStore = null;
-    MessageEncodeAndStoreType encodeAndStore = GiraphConstants
-        .MESSAGE_ENCODE_AND_STORE_TYPE.get(conf);
     Class<I> vertexIdClass = conf.getVertexIdClass();
     if (vertexIdClass.equals(IntWritable.class)) { // INT
       messageStore = new IntByteArrayMessageStore(messageValueFactory,
@@ -160,21 +164,28 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
 
   @Override
   public MessageStore<I, M> newStore(
-      MessageValueFactory<M> messageValueFactory) {
-    Class<M> messageClass = messageValueFactory.getValueClass();
+      MessageClasses<I, M> messageClasses) {
+    Class<M> messageClass = messageClasses.getMessageClass();
+    MessageValueFactory<M> messageValueFactory =
+        messageClasses.createMessageValueFactory(conf);
+    MessageCombiner<? super I, M> messageCombiner =
+        messageClasses.createMessageCombiner(conf);
     MessageStore messageStore;
-    if (conf.useMessageCombiner()) {
-      messageStore = newStoreWithCombiner(messageValueFactory);
+    if (messageCombiner != null) {
+      messageStore = newStoreWithCombiner(
+          messageClass, messageValueFactory, messageCombiner);
     } else {
-      messageStore = newStoreWithoutCombiner(messageValueFactory);
+      messageStore = newStoreWithoutCombiner(
+          messageClass, messageValueFactory,
+          messageClasses.getMessageEncodeAndStoreType());
     }
 
     if (LOG.isInfoEnabled()) {
       LOG.info("newStore: Created " + messageStore.getClass() +
           " for vertex id " + conf.getVertexIdClass() +
           " and message value " + messageClass + " and" +
-          (conf.useMessageCombiner() ? " message combiner " +
-              conf.getMessageCombinerClass() : " no combiner"));
+          (messageCombiner != null ? " message combiner " +
+              messageCombiner.getClass() : " no combiner"));
     }
 
     int asyncMessageStoreThreads =

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 6149a9c..41076e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -36,14 +36,10 @@ public interface MessageStoreFactory<I extends WritableComparable,
   /**
    * Creates new message store.
    *
-   * Note: MessageCombiner class in Configuration can be changed,
-   * this method should return MessageStore which uses current combiner
-   *
-   *
-   * @param messageValueFactory Message class held in the store
+   * @param messageClasses Message classes information to be held in the store
    * @return New message store
    */
-  MS newStore(MessageValueFactory<M> messageValueFactory);
+  MS newStore(MessageClasses<I, M> messageClasses);
 
   /**
    * Implementation class should use this method of initialization

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 349e58b..ad0a5dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -44,7 +45,7 @@ import org.apache.hadoop.io.WritableComparable;
 public class OneMessagePerVertexStore<I extends WritableComparable,
     M extends Writable> extends SimpleMessageStore<I, M, M> {
   /** MessageCombiner for messages */
-  private final MessageCombiner<I, M> messageCombiner;
+  private final MessageCombiner<? super I, M> messageCombiner;
 
   /**
    * @param messageValueFactory Message class held in the store
@@ -55,7 +56,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   public OneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      MessageCombiner<I, M> messageCombiner,
+      MessageCombiner<? super I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     super(messageValueFactory, service, config);
     this.messageCombiner =
@@ -162,9 +163,10 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
 
     @Override
     public MessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
-          config.<M>createMessageCombiner(), config);
+        MessageClasses<I, M> messageClasses) {
+      return new OneMessagePerVertexStore<I, M>(
+          messageClasses.createMessageValueFactory(config), service,
+          messageClasses.createMessageCombiner(config), config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 3000cd4..3351051 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -18,22 +18,23 @@
 
 package org.apache.giraph.comm.messages.out_of_core;
 
-import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Maps;
 
 /**
  * Message store which separates data by partitions,
@@ -48,7 +49,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     MessageStore<I, M> {
   /** Message value factory */
-  private final MessageValueFactory<M> messageValueFactory;
+  private final MessageClasses<I, M> messageClasses;
   /** Service worker */
   private final CentralizedServiceWorker<I, V, E> service;
   /** Number of messages to keep in memory */
@@ -63,19 +64,19 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   /**
    * Constructor
    *
-   * @param messageValueFactory         Factory for creating message values
+   * @param messageClasses              Message classes information
    * @param service                     Service worker
    * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
    * @param partitionStoreFactory       Factory for creating stores for a
    *                                    partition
    */
   public DiskBackedMessageStore(
-      MessageValueFactory<M> messageValueFactory,
+      MessageClasses<I, M> messageClasses,
       CentralizedServiceWorker<I, V, E> service,
       int maxNumberOfMessagesInMemory,
       MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
           M>> partitionStoreFactory) {
-    this.messageValueFactory = messageValueFactory;
+    this.messageClasses = messageClasses;
     this.service = service;
     this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
     this.partitionStoreFactory = partitionStoreFactory;
@@ -238,7 +239,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     if (messageStore != null) {
       return messageStore;
     }
-    messageStore = partitionStoreFactory.newStore(messageValueFactory);
+    messageStore = partitionStoreFactory.newStore(messageClasses);
     PartitionDiskBackedMessageStore<I, M> store =
         partitionMessageStores.putIfAbsent(partitionId, messageStore);
     return (store == null) ? messageStore : store;
@@ -260,7 +261,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
       PartitionDiskBackedMessageStore<I, M> messageStore =
-          partitionStoreFactory.newStore(messageValueFactory);
+          partitionStoreFactory.newStore(messageClasses);
       messageStore.readFields(in);
       partitionMessageStores.put(partitionId, messageStore);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
index f2b31c0..728a2ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
@@ -24,7 +24,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -70,8 +70,8 @@ public class DiskBackedMessageStoreFactory<I extends WritableComparable,
 
   @Override
   public MessageStore<I, M>
-  newStore(MessageValueFactory<M> messageValueFactory) {
-    return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
+  newStore(MessageClasses<I, M> messageClasses) {
+    return new DiskBackedMessageStore<I, V, E, M>(messageClasses,
         service, maxMessagesInMemory, fileStoreFactory);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index bece774..698281f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -35,6 +35,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
@@ -53,6 +54,8 @@ import com.google.common.collect.Maps;
  */
 public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     M extends Writable> implements Writable {
+  /** Message classes */
+  private final MessageClasses<I, M> messageClasses;
   /** Message value factory */
   private final MessageValueFactory<M> messageValueFactory;
   /**
@@ -78,17 +81,18 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
   /**
    * Constructor.
    *
-   * @param messageValueFactory Used to create message values
+   * @param messageClasses      Message classes information
    * @param config              Hadoop configuration
    * @param fileStoreFactory    Factory for creating file stores when flushing
    */
   public PartitionDiskBackedMessageStore(
-      MessageValueFactory<M> messageValueFactory,
+      MessageClasses<I, M> messageClasses,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
           fileStoreFactory) {
     inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
-    this.messageValueFactory = messageValueFactory;
+    this.messageClasses = messageClasses;
+    this.messageValueFactory = messageClasses.createMessageValueFactory(config);
     this.config = config;
     numberOfMessagesInMemory = new AtomicInteger(0);
     destinationVertices =
@@ -227,7 +231,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
       rwLock.writeLock().unlock();
     }
     SequentialFileMessageStore<I, M> fileStore =
-        fileStoreFactory.newStore(messageValueFactory);
+        fileStoreFactory.newStore(messageClasses);
     fileStore.addMessages(messagesToFlush);
 
     synchronized (fileStores) {
@@ -287,7 +291,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     int numFileStores = in.readInt();
     for (int s = 0; s < numFileStores; s++) {
       SequentialFileMessageStore<I, M> fileStore =
-          fileStoreFactory.newStore(messageValueFactory);
+          fileStoreFactory.newStore(messageClasses);
       fileStore.readFields(in);
       fileStores.add(fileStore);
     }
@@ -341,8 +345,8 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
 
     @Override
     public PartitionDiskBackedMessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
-      return new PartitionDiskBackedMessageStore<I, M>(messageValueFactory,
+        MessageClasses<I, M> messageClasses) {
+      return new PartitionDiskBackedMessageStore<I, M>(messageClasses,
           config, fileStoreFactory);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 5988459..8f589bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -42,6 +42,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.io.DataInputOutput;
@@ -407,11 +408,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
 
     @Override
     public SequentialFileMessageStore<I, M> newStore(
-        MessageValueFactory<M> messageValueFactory) {
+        MessageClasses<I, M> messageClasses) {
       int idx = Math.abs(storeCounter.getAndIncrement());
       String fileName =
           directories[idx % directories.length] + "messages-" + idx;
-      return new SequentialFileMessageStore<I, M>(messageValueFactory, config,
+      return new SequentialFileMessageStore<I, M>(
+          messageClasses.createMessageValueFactory(config), config,
           bufferSize, fileName);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index c72bedf..1d2407c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -62,7 +62,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   /** Message value factory */
   private final MessageValueFactory<M> messageValueFactory;
   /** Message messageCombiner */
-  private final MessageCombiner<I, M> messageCombiner;
+  private final MessageCombiner<? super I, M> messageCombiner;
   /** Service worker */
   private final CentralizedServiceWorker<I, ?, ?> service;
   /** Giraph configuration */
@@ -95,7 +95,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
   public IdOneMessagePerVertexStore(
       MessageValueFactory<M> messageValueFactory,
       CentralizedServiceWorker<I, ?, ?> service,
-      MessageCombiner<I, M> messageCombiner,
+      MessageCombiner<? super I, M> messageCombiner,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
     this.service = service;
     this.config = config;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index f762f46..a64a33e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,15 +17,16 @@
  */
 package org.apache.giraph.comm.netty;
 
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.SendOneMessageToManyCache;
 import org.apache.giraph.comm.SendEdgeCache;
 import org.apache.giraph.comm.SendMessageCache;
 import org.apache.giraph.comm.SendMutationsCache;
+import org.apache.giraph.comm.SendOneMessageToManyCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
@@ -58,9 +59,9 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
 
 /**
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
@@ -89,8 +90,6 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
       new SendMutationsCache<I, V, E>();
   /** NettyClient that could be shared among one or more instances */
   private final WorkerClient<I, V, E> workerClient;
-  /** Messages sent during the last superstep */
-  private long totalMsgsSentInSuperstep = 0;
   /** Maximum size of messages per remote worker to cache before sending */
   private final int maxMessagesSizePerWorker;
   /** Maximum size of vertices per remote worker to cache before sending. */
@@ -118,11 +117,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
    * @param context Context
    * @param conf Configuration
    * @param serviceWorker Service worker
+   * @param useOneMessageToManyIdsEncoding should use one message to many
    */
   public NettyWorkerClientRequestProcessor(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
+      CentralizedServiceWorker<I, V, E> serviceWorker,
+      boolean useOneMessageToManyIdsEncoding) {
     this.workerClient = serviceWorker.getWorkerClient();
     this.configuration = conf;
 
@@ -134,7 +135,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
     maxVerticesSizePerWorker =
         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
-    if (this.configuration.useOneMessageToManyIdsEncoding()) {
+    if (useOneMessageToManyIdsEncoding) {
       sendMessageCache =
         new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
           this, maxMessagesSizePerWorker);
@@ -205,7 +206,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(
-            configuration.getOutgoingMessageValueFactory());
+            configuration.createOutgoingMessageValueFactory());
     vertexIdMessages.setConf(configuration);
     vertexIdMessages.initialize();
     for (I vertexId :
@@ -228,7 +229,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         doRequest(workerInfo, messagesRequest);
         vertexIdMessages =
             new ByteArrayVertexIdMessages<I, Writable>(
-                configuration.getOutgoingMessageValueFactory());
+                configuration.createOutgoingMessageValueFactory());
         vertexIdMessages.setConf(configuration);
         vertexIdMessages.initialize();
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index c74b4f5..b59d0cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.Writable;
@@ -69,8 +70,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
     partitionId = input.readInt();
     // At this moment the Computation class have already been replaced with
     // the new one, and we deal with messages from previous superstep
-    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>(
-        getConf().<M>getIncomingMessageValueFactory());
+    vertexIdMessageMap = new ByteArrayVertexIdMessages<>(
+        getConf().<M>createIncomingMessageValueFactory());
     vertexIdMessageMap.setConf(getConf());
     vertexIdMessageMap.initialize();
     vertexIdMessageMap.readFields(input);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index d525164..6953998 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.IOException;
+
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-
 /**
  * Send a collection of vertex messages for a partition.
  *
@@ -56,7 +56,7 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
   @Override
   public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
-        getConf().getOutgoingMessageValueFactory());
+        getConf().createOutgoingMessageValueFactory());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
index 798ddfa..f8d0473 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -74,8 +74,8 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
 
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
-    oneMessageToManyIds = new ByteArrayOneMessageToManyIds<I, M>(
-      getConf().<M>getOutgoingMessageValueFactory());
+    oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
+        getConf().<M>createOutgoingMessageValueFactory());
     oneMessageToManyIds.setConf(getConf());
     oneMessageToManyIds.readFields(input);
   }
@@ -132,7 +132,7 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
               .get(partitionId);
           if (idMsgs == null) {
             idMsgs = new ByteArrayVertexIdMessages<>(
-                getConf().<M>getOutgoingMessageValueFactory());
+                getConf().<M>createOutgoingMessageValueFactory());
             idMsgs.setConf(getConf());
             idMsgs.initialize(initialSize);
             partitionIdMsgs.put(partitionId, idMsgs);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
new file mode 100644
index 0000000..b6e5169
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultMessageClasses.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.python.google.common.base.Preconditions;
+
+/**
+ * Default implementation of MessageClasses
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class DefaultMessageClasses
+    <I extends WritableComparable, M extends Writable>
+    implements MessageClasses<I, M> {
+  /** message class */
+  private Class<M> messageClass;
+  /** message value factory class */
+  private Class<? extends MessageValueFactory<M>>
+  messageValueFactoryClass;
+  /** message combiner class */
+  private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
+  /** whether message class was manually modified in this superstep */
+  private boolean messageClassModified;
+  /** message encode and store type */
+  private MessageEncodeAndStoreType messageEncodeAndStoreType;
+
+  /** Constructor */
+  public DefaultMessageClasses() {
+  }
+
+  /**
+   * Constructor
+   * @param messageClass message class
+   * @param messageValueFactoryClass message value factory class
+   * @param messageCombinerClass message combiner class
+   * @param messageEncodeAndStoreType message encode and store type
+   */
+  public DefaultMessageClasses(
+      Class<M> messageClass,
+      Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
+      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
+        MessageEncodeAndStoreType messageEncodeAndStoreType) {
+    this.messageClass = messageClass;
+    this.messageValueFactoryClass = messageValueFactoryClass;
+    this.messageCombinerClass = messageCombinerClass;
+    this.messageClassModified = false;
+    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+  }
+
+  @Override
+  public Class<M> getMessageClass() {
+    return messageClass;
+  }
+
+  @Override
+  public MessageValueFactory<M> createMessageValueFactory(
+      ImmutableClassesGiraphConfiguration conf) {
+    if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
+      return new DefaultMessageValueFactory<>(messageClass, conf);
+    }
+
+    MessageValueFactory factory = ReflectionUtils.newInstance(
+        messageValueFactoryClass, conf);
+    if (!factory.newInstance().getClass().equals(messageClass)) {
+      throw new IllegalStateException("Message factory " +
+        messageValueFactoryClass + " creates " +
+        factory.newInstance().getClass().getName() + ", but message type is " +
+        messageClass.getName());
+    }
+    return factory;
+  }
+
+  @Override
+  public MessageCombiner<? super I, M> createMessageCombiner(
+      ImmutableClassesGiraphConfiguration conf) {
+    if (messageCombinerClass == null) {
+      return null;
+    }
+
+    MessageCombiner combiner =
+        ReflectionUtils.newInstance(messageCombinerClass, conf);
+    if (combiner != null) {
+      Preconditions.checkState(
+          combiner.createInitialMessage().getClass().equals(messageClass));
+    }
+    return combiner;
+  }
+
+  @Override
+  public boolean useMessageCombiner() {
+    return messageCombinerClass != null;
+  }
+
+  @Override
+  public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
+    return messageEncodeAndStoreType;
+  }
+
+  @Override
+  public MessageClasses<I, M> createCopyForNewSuperstep() {
+    return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
+        messageCombinerClass, messageEncodeAndStoreType);
+  }
+
+  @Override
+  public void verifyConsistent(
+      ImmutableClassesGiraphConfiguration conf) {
+    Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
+        MessageValueFactory.class, messageValueFactoryClass);
+    ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
+        "Message factory", messageValueFactoryClass);
+
+    if (messageCombinerClass != null) {
+      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
+          MessageCombiner.class, messageCombinerClass);
+      ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
+          "Vertex id", messageCombinerClass);
+      ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
+          "Outgoing message", messageCombinerClass);
+    }
+  }
+
+  /**
+   * Set message class
+   * @param messageClass message classs
+   */
+  public void setMessageClass(Class<M> messageClass) {
+    this.messageClassModified = true;
+    this.messageClass = messageClass;
+  }
+
+  /**
+   * Set message class if not set already in this superstep
+   * @param messageClass message class
+   */
+  public void setIfNotModifiedMessageClass(Class<M> messageClass) {
+    if (!messageClassModified) {
+      this.messageClass = messageClass;
+    }
+  }
+
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
+    this.messageCombinerClass = messageCombinerClass;
+  }
+
+  public void setMessageValueFactoryClass(
+      Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
+    this.messageValueFactoryClass = messageValueFactoryClass;
+  }
+
+  public void setMessageEncodeAndStoreType(
+      MessageEncodeAndStoreType messageEncodeAndStoreType) {
+    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeClass(messageClass, out);
+    WritableUtils.writeClass(messageValueFactoryClass, out);
+    WritableUtils.writeClass(messageCombinerClass, out);
+    out.writeBoolean(messageClassModified);
+    out.writeByte(messageEncodeAndStoreType.ordinal());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    messageClass = WritableUtils.readClass(in);
+    messageValueFactoryClass = WritableUtils.readClass(in);
+    messageCombinerClass = WritableUtils.readClass(in);
+    messageClassModified = in.readBoolean();
+    messageEncodeAndStoreType =
+        messageEncodeAndStoreType.values()[in.readByte()];
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 2f3c43a..88d5277 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -19,17 +19,18 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.DefaultVertexResolver;
-import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
-import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.MappingInputFormat;
@@ -45,6 +46,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.types.NoMessage;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
@@ -99,9 +101,11 @@ public class GiraphClasses<I extends WritableComparable,
 
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
-  /** Message combiner class - cached for fast access */
-  protected Class<? extends MessageCombiner<I, ? extends Writable>>
-  messageCombinerClass;
+
+  /** Incoming message classes */
+  protected MessageClasses<I, ? extends Writable> incomingMessageClasses;
+  /** Outgoing message classes */
+  protected MessageClasses<I, ? extends Writable> outgoingMessageClasses;
 
   /** Vertex resolver class - cached for fast access */
   protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
@@ -124,32 +128,19 @@ public class GiraphClasses<I extends WritableComparable,
    * Empty constructor. Initialize with default classes or null.
    */
   public GiraphClasses() {
-    // Note: the cast to Object is required in order for javac to accept the
-    // downcast.
-    computationFactoryClass = (Class<? extends ComputationFactory<I, V, E,
-          ? extends Writable, ? extends Writable>>) (Object)
-        DefaultComputationFactory.class;
+    computationFactoryClass = (Class) DefaultComputationFactory.class;
     giraphTypes = new GiraphTypes<I, V, E>();
-    outEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
-        ByteArrayEdges.class;
-    inputOutEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
-        ByteArrayEdges.class;
-    graphPartitionerFactoryClass =
-        (Class<? extends GraphPartitionerFactory<I, V, E>>) (Object)
-            HashPartitionerFactory.class;
+    outEdgesClass = (Class) ByteArrayEdges.class;
+    inputOutEdgesClass = (Class) ByteArrayEdges.class;
+    graphPartitionerFactoryClass = (Class) HashPartitionerFactory.class;
     aggregatorWriterClass = TextAggregatorWriter.class;
-    vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
-        (Object) DefaultVertexResolver.class;
-    vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
-        (Object) DefaultVertexValueCombiner.class;
+    vertexResolverClass = (Class) DefaultVertexResolver.class;
+    vertexValueCombinerClass = (Class) DefaultVertexValueCombiner.class;
     workerContextClass = DefaultWorkerContext.class;
     masterComputeClass = DefaultMasterCompute.class;
-    partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
-        SimplePartition.class;
-    edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
-        (Object) DefaultEdgeInputFilter.class;
-    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
-        (Object) DefaultVertexInputFilter.class;
+    partitionClass = (Class) SimplePartition.class;
+    edgeInputFilterClass = (Class) DefaultEdgeInputFilter.class;
+    vertexInputFilterClass = (Class) DefaultVertexInputFilter.class;
   }
 
   /**
@@ -190,9 +181,21 @@ public class GiraphClasses<I extends WritableComparable,
         MAPPING_INPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
-    messageCombinerClass =
-        (Class<? extends MessageCombiner<I, ? extends Writable>>)
-        MESSAGE_COMBINER_CLASS.get(conf);
+
+    // incoming messages shouldn't be used in first iteration at all
+    // but empty message stores are created, etc, so using NoMessage
+    // to enforce not a single message is read/written
+    incomingMessageClasses = new DefaultMessageClasses(
+        NoMessage.class,
+        DefaultMessageValueFactory.class,
+        null,
+        MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
+    outgoingMessageClasses = new DefaultMessageClasses(
+        giraphTypes.getInitialOutgoingMessageValueClass(),
+        OUTGOING_MESSAGE_VALUE_FACTORY_CLASS.get(conf),
+        MESSAGE_COMBINER_CLASS.get(conf),
+        MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
+
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         VERTEX_RESOLVER_CLASS.get(conf);
     vertexValueCombinerClass = (Class<? extends VertexValueCombiner<V>>)
@@ -266,24 +269,15 @@ public class GiraphClasses<I extends WritableComparable,
     return giraphTypes.getEdgeValueClass();
   }
 
-  /**
-   * Get incoming Message Value class - messages which have been sent in the
-   * previous superstep and are processed in the current one
-   *
-   * @return Message Value class
-   */
-  public Class<? extends Writable> getIncomingMessageValueClass() {
-    return giraphTypes.getIncomingMessageValueClass();
+
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getIncomingMessageClasses() {
+    return incomingMessageClasses;
   }
 
-  /**
-   * Get outgoing Message Value class - messages which are going to be sent
-   * during current superstep
-   *
-   * @return Message Value class
-   */
-  public Class<? extends Writable> getOutgoingMessageValueClass() {
-    return giraphTypes.getOutgoingMessageValueClass();
+  public MessageClasses<? extends WritableComparable, ? extends Writable>
+  getOutgoingMessageClasses() {
+    return outgoingMessageClasses;
   }
 
   /**
@@ -432,25 +426,6 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Check if MessageCombiner is set
-   *
-   * @return true if MessageCombiner is set
-   */
-  public boolean hasMessageCombinerClass() {
-    return messageCombinerClass != null;
-  }
-
-  /**
-   * Get MessageCombiner used
-   *
-   * @return MessageCombiner
-   */
-  public Class<? extends MessageCombiner<I, ? extends Writable>>
-  getMessageCombinerClass() {
-    return messageCombinerClass;
-  }
-
-  /**
    * Check if VertexResolver is set
    *
    * @return true if VertexResolver is set
@@ -581,12 +556,12 @@ public class GiraphClasses<I extends WritableComparable,
    * Set incoming Message Value class held - messages which have been sent in
    * the previous superstep and are processed in the current one
    *
-   * @param incomingMessageValueClass Message Value class to set
+   * @param incomingMessageClasses Message classes value to set
    * @return this
    */
-  public GiraphClasses setIncomingMessageValueClass(
-      Class<? extends Writable> incomingMessageValueClass) {
-    giraphTypes.setIncomingMessageValueClass(incomingMessageValueClass);
+  public GiraphClasses setIncomingMessageClasses(
+      MessageClasses<I, ? extends Writable> incomingMessageClasses) {
+    this.incomingMessageClasses = incomingMessageClasses;
     return this;
   }
 
@@ -594,12 +569,12 @@ public class GiraphClasses<I extends WritableComparable,
    * Set outgoing Message Value class held - messages which are going to be sent
    * during current superstep
    *
-   * @param outgoingMessageValueClass Message Value class to set
+   * @param outgoingMessageClasses Message classes value to set
    * @return this
    */
-  public GiraphClasses setOutgoingMessageValueClass(
-      Class<? extends Writable> outgoingMessageValueClass) {
-    giraphTypes.setOutgoingMessageValueClass(outgoingMessageValueClass);
+  public GiraphClasses setOutgoingMessageClasses(
+      MessageClasses<I, ? extends Writable> outgoingMessageClasses) {
+    this.outgoingMessageClasses = outgoingMessageClasses;
     return this;
   }
 
@@ -704,18 +679,6 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Set MessageCombiner class used
-   *
-   * @param combinerClass MessageCombiner class to set
-   * @return this
-   */
-  public GiraphClasses setMessageCombiner(
-      Class<? extends MessageCombiner<I, ? extends Writable>> combinerClass) {
-    this.messageCombinerClass = combinerClass;
-    return this;
-  }
-
-  /**
    * Set VertexResolver used
    *
    * @param vertexResolverClass VertexResolver to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index a315399..a395244 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -18,17 +18,23 @@
 
 package org.apache.giraph.conf;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.net.UnknownHostException;
+
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
 import org.apache.giraph.factories.ComputationFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexValueCombiner;
-import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
 import org.apache.giraph.io.MappingInputFormat;
@@ -50,12 +56,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.net.DNS;
 
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-
-import java.net.UnknownHostException;
-
 /**
  * Adds user methods specific to Giraph.  This will be put into an
  * ImmutableClassesGiraphConfiguration that provides the configuration plus
@@ -527,15 +527,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the message combiner class (optional)
-   *
-   * @return messageCombinerClass Determines how vertex messages are combined
-   */
-  public Class<? extends MessageCombiner> getMessageCombinerClass() {
-    return MESSAGE_COMBINER_CLASS.get(this);
-  }
-
-  /**
    * Set the message combiner class (optional)
    *
    * @param messageCombinerClass Determines how vertex messages are combined
@@ -1208,16 +1199,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Return if oneMessageToManyIds encoding can be enabled
-   *
-   * @return True if this option is true.
-   */
-  public boolean useOneMessageToManyIdsEncoding() {
-    return MESSAGE_ENCODE_AND_STORE_TYPE.get(this)
-      .useOneMessageToManyIdsEncoding();
-  }
-
-  /**
    * Get option whether to create a source vertex present only in edge input
    *
    * @return CREATE_EDGE_SOURCE_VERTICES option

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 65b7892..2805c26 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.conf;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
@@ -32,8 +35,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.DefaultComputationFactory;
 import org.apache.giraph.factories.DefaultEdgeValueFactory;
-import org.apache.giraph.factories.DefaultIncomingMessageValueFactory;
-import org.apache.giraph.factories.DefaultOutgoingMessageValueFactory;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
 import org.apache.giraph.factories.DefaultVertexIdFactory;
 import org.apache.giraph.factories.DefaultVertexValueFactory;
 import org.apache.giraph.factories.EdgeValueFactory;
@@ -78,9 +80,6 @@ import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * Constants used all over Giraph for configuration.
  */
@@ -169,17 +168,11 @@ public interface GiraphConstants {
       ClassConfOption.create("giraph.edgeValueFactoryClass",
           DefaultEdgeValueFactory.class, EdgeValueFactory.class,
           "Edge value factory class - optional");
-  /** Incoming message value factory class - optional */
-  ClassConfOption<MessageValueFactory>
-  INCOMING_MESSAGE_VALUE_FACTORY_CLASS =
-      ClassConfOption.create("giraph.incomingMessageValueFactoryClass",
-          DefaultIncomingMessageValueFactory.class, MessageValueFactory.class,
-          "Incoming message value factory class - optional");
   /** Outgoing message value factory class - optional */
   ClassConfOption<MessageValueFactory>
   OUTGOING_MESSAGE_VALUE_FACTORY_CLASS =
       ClassConfOption.create("giraph.outgoingMessageValueFactoryClass",
-          DefaultOutgoingMessageValueFactory.class, MessageValueFactory.class,
+          DefaultMessageValueFactory.class, MessageValueFactory.class,
           "Outgoing message value factory class - optional");
 
   /** Vertex edges class - optional */
@@ -381,10 +374,6 @@ public interface GiraphConstants {
   ClassConfOption<Writable> EDGE_VALUE_CLASS =
       ClassConfOption.create("giraph.edgeValueClass", null, Writable.class,
           "Edge value class");
-  /** Incoming message value class */
-  ClassConfOption<Writable> INCOMING_MESSAGE_VALUE_CLASS =
-      ClassConfOption.create("giraph.incomingMessageValueClass", null,
-          Writable.class, "Incoming message value class");
   /** Outgoing message value class */
   ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS =
       ClassConfOption.create("giraph.outgoingMessageValueClass", null,

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
index 6c854f3..f98912d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
@@ -17,6 +17,14 @@
  */
 package org.apache.giraph.conf;
 
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
@@ -25,15 +33,6 @@ import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
-import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
-import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
-
 /**
  * Holder for the generic types that describe user's graph.
  *
@@ -49,8 +48,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
   private Class<V> vertexValueClass;
   /** Edge value class */
   private Class<E> edgeValueClass;
-  /** Incoming message value class */
-  private Class<? extends Writable> incomingMessageValueClass;
   /** Outgoing message value class */
   private Class<? extends Writable> outgoingMessageValueClass;
   /** Vertex implementation class */
@@ -77,7 +74,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
       Class<? extends Writable> incomingMessageValueClass,
       Class<? extends Writable> outgoingMessageValueClass) {
     this.edgeValueClass = edgeValueClass;
-    this.incomingMessageValueClass = incomingMessageValueClass;
     this.outgoingMessageValueClass = outgoingMessageValueClass;
     this.vertexIdClass = vertexIdClass;
     this.vertexValueClass = vertexValueClass;
@@ -119,7 +115,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     vertexIdClass = (Class<I>) classList[0];
     vertexValueClass = (Class<V>) classList[1];
     edgeValueClass = (Class<E>) classList[2];
-    incomingMessageValueClass = (Class<? extends Writable>) classList[3];
     outgoingMessageValueClass = (Class<? extends Writable>) classList[4];
   }
 
@@ -132,7 +127,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     vertexIdClass = (Class<I>) VERTEX_ID_CLASS.get(conf);
     vertexValueClass = (Class<V>) VERTEX_VALUE_CLASS.get(conf);
     edgeValueClass = (Class<E>) EDGE_VALUE_CLASS.get(conf);
-    incomingMessageValueClass = INCOMING_MESSAGE_VALUE_CLASS.get(conf);
     outgoingMessageValueClass = OUTGOING_MESSAGE_VALUE_CLASS.get(conf);
     vertexClass = VERTEX_CLASS.get(conf);
   }
@@ -146,7 +140,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     return vertexIdClass != null &&
         vertexValueClass != null &&
         edgeValueClass != null &&
-        incomingMessageValueClass != null &&
         outgoingMessageValueClass != null;
   }
 
@@ -159,7 +152,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     VERTEX_ID_CLASS.set(conf, vertexIdClass);
     VERTEX_VALUE_CLASS.set(conf, vertexValueClass);
     EDGE_VALUE_CLASS.set(conf, edgeValueClass);
-    INCOMING_MESSAGE_VALUE_CLASS.set(conf, incomingMessageValueClass);
     OUTGOING_MESSAGE_VALUE_CLASS.set(conf, outgoingMessageValueClass);
   }
 
@@ -172,7 +164,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     VERTEX_ID_CLASS.setIfUnset(conf, vertexIdClass);
     VERTEX_VALUE_CLASS.setIfUnset(conf, vertexValueClass);
     EDGE_VALUE_CLASS.setIfUnset(conf, edgeValueClass);
-    INCOMING_MESSAGE_VALUE_CLASS.setIfUnset(conf, incomingMessageValueClass);
     OUTGOING_MESSAGE_VALUE_CLASS.setIfUnset(conf, outgoingMessageValueClass);
   }
 
@@ -180,11 +171,7 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     return edgeValueClass;
   }
 
-  public Class<? extends Writable> getIncomingMessageValueClass() {
-    return incomingMessageValueClass;
-  }
-
-  public Class<? extends Writable> getOutgoingMessageValueClass() {
+  Class<? extends Writable> getInitialOutgoingMessageValueClass() {
     return outgoingMessageValueClass;
   }
 
@@ -204,16 +191,6 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
     this.edgeValueClass = edgeValueClass;
   }
 
-  public void setIncomingMessageValueClass(
-      Class<? extends Writable> incomingMessageValueClass) {
-    this.incomingMessageValueClass = incomingMessageValueClass;
-  }
-
-  public void setOutgoingMessageValueClass(
-      Class<? extends Writable> outgoingMessageValueClass) {
-    this.outgoingMessageValueClass = outgoingMessageValueClass;
-  }
-
   public void setVertexIdClass(Class<I> vertexIdClass) {
     this.vertexIdClass = vertexIdClass;
   }
@@ -221,4 +198,9 @@ public class GiraphTypes<I extends WritableComparable, V extends Writable,
   public void setVertexValueClass(Class<V> vertexValueClass) {
     this.vertexValueClass = vertexValueClass;
   }
+
+  public void setOutgoingMessageValueClass(
+      Class<? extends Writable> outgoingMessageValueClass) {
+    this.outgoingMessageValueClass = outgoingMessageValueClass;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 381495e..967737c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -27,6 +27,7 @@ import io.netty.handler.codec.compression.SnappyFramedEncoder;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.EdgeStoreFactory;
@@ -142,8 +143,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
-    valueFactories = new ValueFactories<I, V, E>(conf);
-    valueFactories.initializeIVE(this);
+    valueFactories = new ValueFactories<I, V, E>(this);
   }
 
   /**
@@ -530,40 +530,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Get the user's subclassed
-   * {@link org.apache.giraph.combiner.MessageCombiner} class.
-   *
-   * @return User's combiner class
-   */
-  @Override
-  public Class<? extends MessageCombiner<I, ? extends Writable>>
-  getMessageCombinerClass() {
-    return classes.getMessageCombinerClass();
-  }
-
-  /**
-   * Create a user combiner class
-   *
-   * @param <M> Message data
-   * @return Instantiated user combiner class
-   */
-  @SuppressWarnings("rawtypes")
-  public <M extends Writable> MessageCombiner<I, M> createMessageCombiner() {
-    Class<? extends MessageCombiner<I, M>> klass =
-        classes.getMessageCombinerClass();
-    return ReflectionUtils.newInstance(klass, this);
-  }
-
-  /**
-   * Check if user set a combiner
-   *
-   * @return True iff user set a combiner class
-   */
-  public boolean useMessageCombiner() {
-    return classes.hasMessageCombinerClass();
-  }
-
-  /**
-   * Get the user's subclassed
    * {@link org.apache.giraph.graph.VertexValueCombiner} class.
    *
    * @return User's vertex value combiner class
@@ -905,47 +871,92 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @return User's vertex message value class
    */
   public <M extends Writable> Class<M> getIncomingMessageValueClass() {
-    return classes.getIncomingMessageValueClass();
+    return classes.getIncomingMessageClasses().getMessageClass();
   }
 
   /**
-   * Get the factory for creating incoming message values
+   * Get the user's subclassed outgoing message value class.
    *
-   * @param <M> Incoming Message type
-   * @return MessageValueFactory
+   * @param <M> Message type
+   * @return User's vertex message value class
    */
-  public <M extends Writable> MessageValueFactory<M>
-  getIncomingMessageValueFactory() {
-    Class<? extends MessageValueFactory> klass =
-        valueFactories.getInMsgFactoryClass();
-    MessageValueFactory<M> factory = ReflectionUtils.newInstance(klass, this);
-    factory.initialize(this);
-    return factory;
+  public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
+    return classes.getOutgoingMessageClasses().getMessageClass();
   }
 
   /**
-   * Get the user's subclassed outgoing message value class.
+   * Get incoming message classes
+   * @param <M> message type
+   * @return incoming message classes
+   */
+  public <M extends Writable>
+  MessageClasses<I, M> getIncomingMessageClasses() {
+    return classes.getIncomingMessageClasses();
+  }
+
+  /**
+   * Get outgoing message classes
+   * @param <M> message type
+   * @return outgoing message classes
+   */
+  public <M extends Writable>
+  MessageClasses<I, M> getOutgoingMessageClasses() {
+    return classes.getOutgoingMessageClasses();
+  }
+
+  /**
+   * Create new outgoing message value factory
+   * @param <M> message type
+   * @return outgoing message value factory
+   */
+  public <M extends Writable>
+  MessageValueFactory<M> createOutgoingMessageValueFactory() {
+    return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
+  }
+
+  /**
+   * Create new incoming message value factory
+   * @param <M> message type
+   * @return incoming message value factory
+   */
+  public <M extends Writable>
+  MessageValueFactory<M> createIncomingMessageValueFactory() {
+    return classes.getIncomingMessageClasses().createMessageValueFactory(this);
+  }
+
+  @Override
+  public void setMessageCombinerClass(
+      Class<? extends MessageCombiner> messageCombinerClass) {
+    throw new IllegalArgumentException(
+        "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
+  }
+
+  /**
+   * Create a user combiner class
    *
    * @param <M> Message data
-   * @return User's vertex message value class
+   * @return Instantiated user combiner class
    */
-  public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
-    return classes.getOutgoingMessageValueClass();
+  public <M extends Writable> MessageCombiner<? super I, M>
+  createOutgoingMessageCombiner() {
+    return classes.getOutgoingMessageClasses().createMessageCombiner(this);
   }
 
   /**
-   * Get the factory for creating outgoing message values
+   * Check if user set a combiner
    *
-   * @param <M> Outgoing Message type
-   * @return MessageValueFactory
+   * @return True iff user set a combiner class
    */
-  public <M extends Writable> MessageValueFactory<M>
-  getOutgoingMessageValueFactory() {
-    Class<? extends MessageValueFactory> klass =
-        valueFactories.getOutMsgFactoryClass();
-    MessageValueFactory<M> factory = ReflectionUtils.newInstance(klass, this);
-    factory.initialize(this);
-    return factory;
+  public boolean useOutgoingMessageCombiner() {
+    return classes.getOutgoingMessageClasses().useMessageCombiner();
+  }
+
+  /**
+   * Get outgoing message encode and store type
+   * @return outgoing message encode and store type
+   */
+  public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
+    return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
   }
 
   @Override
@@ -1227,20 +1238,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @param superstepClasses SuperstepClasses
    */
   public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
-    Class<? extends Computation> computationClass =
-        superstepClasses.getComputationClass();
-    classes.setComputationClass(computationClass);
-    Class<? extends Writable> incomingMsgValueClass =
-        superstepClasses.getIncomingMessageClass();
-    if (incomingMsgValueClass != null) {
-      classes.setIncomingMessageValueClass(incomingMsgValueClass);
-    }
-    Class<? extends Writable> outgoingMsgValueClass =
-        superstepClasses.getOutgoingMessageClass();
-    if (outgoingMsgValueClass != null) {
-      classes.setOutgoingMessageValueClass(outgoingMsgValueClass);
-    }
-    classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
+    superstepClasses.updateGiraphClasses(classes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d0b81ac/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
new file mode 100644
index 0000000..b2a09c7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/MessageClasses.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for containing all items that define message being sent,
+ * including it's value factory and combiner.
+ *
+ * @param <I>
+ * @param <M>
+ */
+public interface MessageClasses
+    <I extends WritableComparable, M extends Writable> extends Writable {
+  /**
+   * Get message class
+   * @return message class
+   */
+  Class<M> getMessageClass();
+
+  /**
+   * Create new instance of MessageValueFactory
+   * @param conf Configuration
+   * @return message value factory
+   */
+  MessageValueFactory<M> createMessageValueFactory(
+      ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Create new instance of MessageCombiner
+   * @param conf Configuration
+   * @return message combiner
+   */
+  MessageCombiner<? super I, M> createMessageCombiner(
+      ImmutableClassesGiraphConfiguration
+        <I, ? extends Writable, ? extends Writable> conf);
+
+  /**
+   * Has message combiner been specified
+   * @return has message combiner been specified
+   */
+  boolean useMessageCombiner();
+
+  /**
+   * Get MessageEncodeAndStoreType
+   * @return message encode and store type
+   */
+  MessageEncodeAndStoreType getMessageEncodeAndStoreType();
+
+  /**
+   * Creates a fresh copy of this object,
+   * to be used and changed for new superstep.
+   * (that should be independent from the previous one)
+   *
+   * @return message classes
+   */
+  MessageClasses<I, M> createCopyForNewSuperstep();
+
+  /**
+   * Verify if types are internally consistent
+   *
+   * @param conf Configuration
+   */
+  void verifyConsistent(ImmutableClassesGiraphConfiguration conf);
+}