You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:06 UTC

[10/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 10e4975..621bb14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -36,11 +36,9 @@ import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.partition.DefaultPartitionContext;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.SimplePartition;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.DefaultWorkerContext;
@@ -57,22 +55,25 @@ import java.util.List;
  * @param <I> Vertex ID class
  * @param <V> Vertex Value class
  * @param <E> Edge class
- * @param <M> Message class
  */
 @SuppressWarnings("unchecked")
 public class GiraphClasses<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     implements GiraphConstants {
-  /** Vertex class - cached for fast access */
-  protected Class<? extends Vertex<I, V, E, M>> vertexClass;
+  /** Computation class - cached for fast access */
+  protected Class<? extends
+      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  computationClass;
   /** Vertex id class - cached for fast access */
   protected Class<I> vertexIdClass;
   /** Vertex value class - cached for fast access */
   protected Class<V> vertexValueClass;
   /** Edge value class - cached for fast access */
   protected Class<E> edgeValueClass;
-  /** Message value class - cached for fast access */
-  protected Class<M> messageValueClass;
+  /** Incoming message value class - cached for fast access */
+  protected Class<? extends Writable> incomingMessageValueClass;
+  /** Outgoing message value class - cached for fast access */
+  protected Class<? extends Writable> outgoingMessageValueClass;
   /** Vertex edges class - cached for fast access */
   protected Class<? extends OutEdges<I, E>> outEdgesClass;
   /** Input vertex edges class - cached for fast access */
@@ -82,7 +83,7 @@ public class GiraphClasses<I extends WritableComparable,
   protected Class<? extends VertexValueFactory<V>> vertexValueFactoryClass;
 
   /** Graph partitioner factory class - cached for fast access */
-  protected Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  protected Class<? extends GraphPartitionerFactory<I, V, E>>
   graphPartitionerFactoryClass;
 
   /** Vertex input format class - cached for fast access */
@@ -98,24 +99,22 @@ public class GiraphClasses<I extends WritableComparable,
   /** Aggregator writer class - cached for fast access */
   protected Class<? extends AggregatorWriter> aggregatorWriterClass;
   /** Combiner class - cached for fast access */
-  protected Class<? extends Combiner<I, M>> combinerClass;
+  protected Class<? extends Combiner<I, ? extends Writable>> combinerClass;
 
   /** Vertex resolver class - cached for fast access */
-  protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass;
-  /** Partition context class - cached for fast access */
-  protected Class<? extends PartitionContext> partitionContextClass;
+  protected Class<? extends VertexResolver<I, V, E>> vertexResolverClass;
   /** Worker context class - cached for fast access */
   protected Class<? extends WorkerContext> workerContextClass;
   /** Master compute class - cached for fast access */
   protected Class<? extends MasterCompute> masterComputeClass;
 
   /** Partition class - cached for fast accesss */
-  protected Class<? extends Partition<I, V, E, M>> partitionClass;
+  protected Class<? extends Partition<I, V, E>> partitionClass;
 
   /** Edge Input Filter class */
   protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
   /** Vertex Input Filter class */
-  protected Class<? extends VertexInputFilter<I, V, E, M>>
+  protected Class<? extends VertexInputFilter<I, V, E>>
   vertexInputFilterClass;
 
   /**
@@ -131,19 +130,18 @@ public class GiraphClasses<I extends WritableComparable,
     vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>) (Object)
         DefaultVertexValueFactory.class;
     graphPartitionerFactoryClass =
-        (Class<? extends GraphPartitionerFactory<I, V, E, M>>) (Object)
+        (Class<? extends GraphPartitionerFactory<I, V, E>>) (Object)
             HashPartitionerFactory.class;
     aggregatorWriterClass = TextAggregatorWriter.class;
-    vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+    vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         (Object) DefaultVertexResolver.class;
-    partitionContextClass = DefaultPartitionContext.class;
     workerContextClass = DefaultWorkerContext.class;
     masterComputeClass = DefaultMasterCompute.class;
-    partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
+    partitionClass = (Class<? extends Partition<I, V, E>>) (Object)
         SimplePartition.class;
     edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
         (Object) DefaultEdgeInputFilter.class;
-    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
         (Object) DefaultVertexInputFilter.class;
   }
 
@@ -163,13 +161,17 @@ public class GiraphClasses<I extends WritableComparable,
    */
   private void readFromConf(Configuration conf) {
     // set pre-validated generic parameter types into Configuration
-    vertexClass = (Class<? extends Vertex<I, V, E, M>>) VERTEX_CLASS.get(conf);
-    List<Class<?>> classList = ReflectionUtils.getTypeArguments(Vertex.class,
-        vertexClass);
+    computationClass =
+        (Class<? extends
+            Computation<I, V, E, ? extends Writable, ? extends Writable>>)
+            COMPUTATION_CLASS.get(conf);
+    List<Class<?>> classList =
+        ReflectionUtils.getTypeArguments(Computation.class, computationClass);
     vertexIdClass = (Class<I>) classList.get(0);
     vertexValueClass = (Class<V>) classList.get(1);
     edgeValueClass = (Class<E>) classList.get(2);
-    messageValueClass = (Class<M>) classList.get(3);
+    incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
+    outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
 
     outEdgesClass = (Class<? extends OutEdges<I, E>>)
         VERTEX_EDGES_CLASS.get(conf);
@@ -179,7 +181,7 @@ public class GiraphClasses<I extends WritableComparable,
         VERTEX_VALUE_FACTORY_CLASS.get(conf);
 
     graphPartitionerFactoryClass =
-        (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+        (Class<? extends GraphPartitionerFactory<I, V, E>>)
             GRAPH_PARTITIONER_FACTORY_CLASS.get(conf);
 
     vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E>>)
@@ -190,29 +192,30 @@ public class GiraphClasses<I extends WritableComparable,
         EDGE_INPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
-    combinerClass = (Class<? extends Combiner<I, M>>)
+    combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
         VERTEX_COMBINER_CLASS.get(conf);
-    vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+    vertexResolverClass = (Class<? extends VertexResolver<I, V, E>>)
         VERTEX_RESOLVER_CLASS.get(conf);
-    partitionContextClass = PARTITION_CONTEXT_CLASS.get(conf);
     workerContextClass = WORKER_CONTEXT_CLASS.get(conf);
     masterComputeClass =  MASTER_COMPUTE_CLASS.get(conf);
-    partitionClass = (Class<? extends Partition<I, V, E, M>>)
+    partitionClass = (Class<? extends Partition<I, V, E>>)
         PARTITION_CLASS.get(conf);
 
     edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
         EDGE_INPUT_FILTER_CLASS.get(conf);
-    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E>>)
         VERTEX_INPUT_FILTER_CLASS.get(conf);
   }
 
   /**
-   * Get Vertex class
+   * Get Computation class
    *
-   * @return Vertex class.
+   * @return Computation class.
    */
-  public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
-    return vertexClass;
+  public Class<? extends
+      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  getComputationClass() {
+    return computationClass;
   }
 
   /**
@@ -243,12 +246,23 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Get Message Value class
+   * Get incoming Message Value class - messages which have been sent in the
+   * previous superstep and are processed in the current one
    *
    * @return Message Value class
    */
-  public Class<M> getMessageValueClass() {
-    return messageValueClass;
+  public Class<? extends Writable> getIncomingMessageValueClass() {
+    return incomingMessageValueClass;
+  }
+
+  /**
+   * Get outgoing Message Value class - messages which are going to be sent
+   * during current superstep
+   *
+   * @return Message Value class
+   */
+  public Class<? extends Writable> getOutgoingMessageValueClass() {
+    return outgoingMessageValueClass;
   }
 
   /**
@@ -283,7 +297,7 @@ public class GiraphClasses<I extends WritableComparable,
    *
    * @return GraphPartitionerFactory
    */
-  public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  public Class<? extends GraphPartitionerFactory<I, V, E>>
   getGraphPartitionerFactoryClass() {
     return graphPartitionerFactoryClass;
   }
@@ -293,7 +307,7 @@ public class GiraphClasses<I extends WritableComparable,
     return edgeInputFilterClass;
   }
 
-  public Class<? extends VertexInputFilter<I, V, E, M>>
+  public Class<? extends VertexInputFilter<I, V, E>>
   getVertexInputFilterClass() {
     return vertexInputFilterClass;
   }
@@ -386,7 +400,7 @@ public class GiraphClasses<I extends WritableComparable,
    *
    * @return Combiner
    */
-  public Class<? extends Combiner<I, M>> getCombinerClass() {
+  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
     return combinerClass;
   }
 
@@ -404,29 +418,11 @@ public class GiraphClasses<I extends WritableComparable,
    *
    * @return VertexResolver
    */
-  public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+  public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
     return vertexResolverClass;
   }
 
   /**
-   * Check if PartitionContext is set
-   *
-   * @return true if PartitionContext is set
-   */
-  public boolean hasPartitionContextClass() {
-    return partitionContextClass != null;
-  }
-
-  /**
-   * Get PartitionContext used
-   *
-   * @return PartitionContext
-   */
-  public Class<? extends PartitionContext> getPartitionContextClass() {
-    return partitionContextClass;
-  }
-
-  /**
    * Check if WorkerContext is set
    *
    * @return true if WorkerContext is set
@@ -476,19 +472,24 @@ public class GiraphClasses<I extends WritableComparable,
    *
    * @return Partition
    */
-  public Class<? extends Partition<I, V, E, M>> getPartitionClass() {
+  public Class<? extends Partition<I, V, E>> getPartitionClass() {
     return partitionClass;
   }
 
   /**
-   * Set Vertex class held
+   * Set Computation class held, and update message types
    *
-   * @param vertexClass Vertex class to set
+   * @param computationClass Computation class to set
    * @return this
    */
-  public GiraphClasses setVertexClass(
-      Class<? extends Vertex<I, V, E, M>> vertexClass) {
-    this.vertexClass = vertexClass;
+  public GiraphClasses setComputationClass(Class<? extends
+      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+      computationClass) {
+    this.computationClass = computationClass;
+    List<Class<?>> classList =
+        ReflectionUtils.getTypeArguments(Computation.class, computationClass);
+    incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
+    outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
     return this;
   }
 
@@ -526,13 +527,28 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Set Message Value class held
+   * Set incoming Message Value class held - messages which have been sent in
+   * the previous superstep and are processed in the current one
+   *
+   * @param incomingMessageValueClass Message Value class to set
+   * @return this
+   */
+  public GiraphClasses setIncomingMessageValueClass(
+      Class<? extends Writable> incomingMessageValueClass) {
+    this.incomingMessageValueClass = incomingMessageValueClass;
+    return this;
+  }
+
+  /**
+   * Set outgoing Message Value class held - messages which are going to be sent
+   * during current superstep
    *
-   * @param messageValueClass Message Value class to set
+   * @param outgoingMessageValueClass Message Value class to set
    * @return this
    */
-  public GiraphClasses setMessageValueClass(Class<M> messageValueClass) {
-    this.messageValueClass = messageValueClass;
+  public GiraphClasses setOutgoingMessageValueClass(
+      Class<? extends Writable> outgoingMessageValueClass) {
+    this.outgoingMessageValueClass = outgoingMessageValueClass;
     return this;
   }
 
@@ -583,7 +599,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setGraphPartitionerFactoryClass(
-      Class<? extends GraphPartitionerFactory<I, V, E, M>> klass) {
+      Class<? extends GraphPartitionerFactory<I, V, E>> klass) {
     this.graphPartitionerFactoryClass = klass;
     return this;
   }
@@ -643,7 +659,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setCombinerClass(
-      Class<? extends Combiner<I, M>> combinerClass) {
+      Class<? extends Combiner<I, ? extends Writable>> combinerClass) {
     this.combinerClass = combinerClass;
     return this;
   }
@@ -655,24 +671,12 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setVertexResolverClass(
-      Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass) {
+      Class<? extends VertexResolver<I, V, E>> vertexResolverClass) {
     this.vertexResolverClass = vertexResolverClass;
     return this;
   }
 
   /**
-   * Set PartitionContext used
-   *
-   * @param partitionContextClass PartitionContext class to set
-   * @return this
-   */
-  public GiraphClasses setPartitionContextClass(
-      Class<? extends PartitionContext> partitionContextClass) {
-    this.partitionContextClass = partitionContextClass;
-    return this;
-  }
-
-  /**
    * Set WorkerContext used
    *
    * @param workerContextClass WorkerContext class to set
@@ -703,7 +707,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setPartitionClass(
-      Class<? extends Partition<I, V, E, M>> partitionClass) {
+      Class<? extends Partition<I, V, E>> partitionClass) {
     this.partitionClass = partitionClass;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 754fad9..58a3f01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,9 +20,9 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -35,7 +35,6 @@ import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.ReusesObjectsPartition;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
@@ -79,22 +78,22 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+   * Get the user's subclassed {@link Computation}
    *
-   * @return User's vertex class
+   * @return User's computation class
    */
-  public Class<? extends Vertex> getVertexClass() {
-    return VERTEX_CLASS.get(this);
+  public Class<? extends Computation> getComputationClass() {
+    return COMPUTATION_CLASS.get(this);
   }
 
   /**
-   * Set the vertex class (required)
+   * Set the computation class (required)
    *
-   * @param vertexClass Runs vertex computation
+   * @param computationClass Runs vertex computation
    */
-  public final void setVertexClass(
-      Class<? extends Vertex> vertexClass) {
-    VERTEX_CLASS.set(this, vertexClass);
+  public void setComputationClass(
+      Class<? extends Computation> computationClass) {
+    COMPUTATION_CLASS.set(this, computationClass);
   }
 
   /**
@@ -407,7 +406,7 @@ public class GiraphConfiguration extends Configuration
    *
    * @param vertexCombinerClass Determines how vertex messages are combined
    */
-  public final void setCombinerClass(
+  public void setCombinerClass(
       Class<? extends Combiner> vertexCombinerClass) {
     VERTEX_COMBINER_CLASS.set(this, vertexCombinerClass);
   }
@@ -452,17 +451,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Set the partition context class (optional)
-   *
-   * @param partitionContextClass Determines what code is executed for each
-   *        partition before and after each superstep
-   */
-  public final void setPartitionContextClass(
-      Class<? extends PartitionContext> partitionContextClass) {
-    PARTITION_CONTEXT_CLASS.set(this, partitionContextClass);
-  }
-
-  /**
    * Set the worker context class (optional)
    *
    * @param workerContextClass Determines what code is executed on a each

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index bbf50e5..2d0f59c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -39,11 +39,9 @@ import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.partition.DefaultPartitionContext;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.SimplePartition;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.worker.WorkerContext;
@@ -62,9 +60,10 @@ public interface GiraphConstants {
   /** 1KB in bytes */
   int ONE_KB = 1024;
 
-  /** Vertex class - required */
-  ClassConfOption<Vertex> VERTEX_CLASS =
-      ClassConfOption.create("giraph.vertexClass", null, Vertex.class);
+  /** Computation class - required */
+  ClassConfOption<Computation> COMPUTATION_CLASS =
+      ClassConfOption.create("giraph.computationClass", null,
+          Computation.class);
   /** Vertex value factory class - optional */
   ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
       ClassConfOption.create("giraph.vertexValueFactoryClass",
@@ -184,13 +183,14 @@ public interface GiraphConstants {
   /** Edge value class */
   ClassConfOption<Writable> EDGE_VALUE_CLASS =
       ClassConfOption.create("giraph.edgeValueClass", null, Writable.class);
-  /** Message value class */
-  ClassConfOption<Writable> MESSAGE_VALUE_CLASS =
-      ClassConfOption.create("giraph.messageValueClass", null, Writable.class);
-  /** Partition context class */
-  ClassConfOption<PartitionContext> PARTITION_CONTEXT_CLASS =
-      ClassConfOption.create("giraph.partitionContextClass",
-          DefaultPartitionContext.class, PartitionContext.class);
+  /** Incoming message value class */
+  ClassConfOption<Writable> INCOMING_MESSAGE_VALUE_CLASS =
+      ClassConfOption.create("giraph.incomingMessageValueClass", null,
+          Writable.class);
+  /** Outgoing message value class */
+  ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS =
+      ClassConfOption.create("giraph.outgoingMessageValueClass", null,
+          Writable.class);
   /** Worker context class */
   ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
       ClassConfOption.create("giraph.workerContextClass",

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
index e4351a2..dbffbc7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
@@ -27,23 +27,20 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public interface ImmutableClassesGiraphConfigurable<
-    I extends WritableComparable, V extends Writable, E extends Writable,
-    M extends Writable> {
+    I extends WritableComparable, V extends Writable, E extends Writable> {
   /**
    * Set the configuration to be used by this object.
    *
    * @param configuration Set configuration
    */
-  void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M>
-                   configuration);
+  void setConf(ImmutableClassesGiraphConfiguration<I, V, E> configuration);
 
   /**
    * Return the configuration used by this object.
    *
    * @return Set configuration
    */
-  ImmutableClassesGiraphConfiguration<I, V, E, M> getConf();
+  ImmutableClassesGiraphConfiguration<I, V, E> getConf();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index a9add4f..aa52498 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,11 +20,11 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReusableEdge;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
@@ -43,9 +43,9 @@ import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.master.SuperstepClasses;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataInput;
@@ -70,11 +70,10 @@ import org.apache.hadoop.util.Progressable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("unchecked")
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
@@ -95,7 +94,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    */
   public ImmutableClassesGiraphConfiguration(Configuration conf) {
     super(conf);
-    classes = new GiraphClasses<I, V, E, M>(conf);
+    classes = new GiraphClasses<I, V, E>(conf);
     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
     try {
       vertexValueFactory = (VertexValueFactory<V>)
@@ -113,23 +112,6 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a new ImmutableClassesGiraphConfiguration. This is a convenience
-   * method to make it easier to deal with generics.
-   *
-   * @param conf Configuration to read
-   * @param <I> Vertex ID
-   * @param <V> Vertex Value
-   * @param <E> Edge Value
-   * @param <M> Message Value
-   * @return new ImmutableClassesGiraphConfiguration
-   */
-  public static <I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-  ImmutableClassesGiraphConfiguration<I, V, E, M> create(Configuration conf) {
-    return new ImmutableClassesGiraphConfiguration<I, V, E, M>(conf);
-  }
-
-  /**
    * Configure an object with this instance if the object is configurable.
    * @param obj Object
    */
@@ -162,7 +144,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return VertexInputFilter class
    */
-  public Class<? extends VertexInputFilter<I, V, E, M>>
+  public Class<? extends VertexInputFilter<I, V, E>>
   getVertexInputFilterClass() {
     return classes.getVertexInputFilterClass();
   }
@@ -181,7 +163,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return User's graph partitioner
    */
-  public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  public Class<? extends GraphPartitionerFactory<I, V, E>>
   getGraphPartitionerClass() {
     return classes.getGraphPartitionerFactoryClass();
   }
@@ -191,8 +173,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return Instantiated user graph partitioner class
    */
-  public GraphPartitionerFactory<I, V, E, M> createGraphPartitioner() {
-    Class<? extends GraphPartitionerFactory<I, V, E, M>> klass =
+  public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
+    Class<? extends GraphPartitionerFactory<I, V, E>> klass =
         classes.getGraphPartitionerFactoryClass();
     return ReflectionUtils.newInstance(klass, this);
   }
@@ -380,17 +362,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return User's combiner class
    */
-  public Class<? extends Combiner<I, M>> getCombinerClass() {
+  public Class<? extends Combiner<I, ? extends Writable>> getCombinerClass() {
     return classes.getCombinerClass();
   }
 
   /**
    * Create a user combiner class
    *
+   * @param <M> Message data
    * @return Instantiated user combiner class
    */
   @SuppressWarnings("rawtypes")
-  public Combiner<I, M> createCombiner() {
+  public <M extends Writable> Combiner<I, M> createCombiner() {
     Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
     return ReflectionUtils.newInstance(klass, this);
   }
@@ -409,41 +392,17 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    *
    * @return User's vertex resolver class
    */
-  public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+  public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
     return classes.getVertexResolverClass();
   }
 
   /**
    * Create a user vertex revolver
    *
-   * @param graphState State of the graph from the worker
    * @return Instantiated user vertex resolver
    */
-  @SuppressWarnings("rawtypes")
-  public VertexResolver<I, V, E, M> createVertexResolver(
-                       GraphState<I, V, E, M> graphState) {
-    VertexResolver<I, V, E, M> resolver =
-        ReflectionUtils.newInstance(getVertexResolverClass(), this);
-    resolver.setGraphState(graphState);
-    return resolver;
-  }
-
-  /**
-   * Get the user's subclassed PartitionContext.
-   *
-   * @return User's partition context class
-   */
-  public Class<? extends PartitionContext> getPartitionContextClass() {
-    return classes.getPartitionContextClass();
-  }
-
-  /**
-   * Create a user partition context
-   *
-   * @return Instantiated user partition context
-   */
-  public PartitionContext createPartitionContext() {
-    return ReflectionUtils.newInstance(getPartitionContextClass(), this);
+  public VertexResolver<I, V, E> createVertexResolver() {
+    return ReflectionUtils.newInstance(getVertexResolverClass(), this);
   }
 
   /**
@@ -458,15 +417,10 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   /**
    * Create a user worker context
    *
-   * @param graphState State of the graph from the worker
    * @return Instantiated user worker context
    */
-  @SuppressWarnings("rawtypes")
-  public WorkerContext createWorkerContext(GraphState<I, V, E, M> graphState) {
-    WorkerContext workerContext =
-        ReflectionUtils.newInstance(getWorkerContextClass(), this);
-    workerContext.setGraphState(graphState);
-    return workerContext;
+  public WorkerContext createWorkerContext() {
+    return ReflectionUtils.newInstance(getWorkerContextClass(), this);
   }
 
   /**
@@ -488,17 +442,29 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   @Override
-  public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
-    return classes.getVertexClass();
+  public Class<? extends
+      Computation<I, V, E, ? extends Writable, ? extends Writable>>
+  getComputationClass() {
+    return classes.getComputationClass();
   }
 
   /**
-   * Create a user vertex
+   * Create a user computation
    *
-   * @return Instantiated user vertex
+   * @return Instantiated user computation
    */
-  public Vertex<I, V, E, M> createVertex() {
-    return ReflectionUtils.newInstance(getVertexClass(), this);
+  public Computation<I, V, E, ? extends Writable, ? extends Writable>
+  createComputation() {
+    return ReflectionUtils.newInstance(getComputationClass(), this);
+  }
+
+  /**
+   * Create a vertex
+   *
+   * @return Instantiated vertex
+   */
+  public Vertex<I, V, E> createVertex() {
+    return ReflectionUtils.newInstance(Vertex.class, this);
   }
 
   /**
@@ -658,34 +624,58 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Get the user's subclassed vertex message value class.
+   * Get the user's subclassed incoming message value class.
    *
+   * @param <M> Message data
    * @return User's vertex message value class
    */
-  @SuppressWarnings("unchecked")
-  public Class<M> getMessageValueClass() {
-    return classes.getMessageValueClass();
+  public <M extends Writable> Class<M> getIncomingMessageValueClass() {
+    return classes.getIncomingMessageValueClass();
   }
 
   /**
-   * Create a user vertex message value
+   * Get the user's subclassed outgoing message value class.
    *
-   * @return Instantiated user vertex message value
+   * @param <M> Message data
+   * @return User's vertex message value class
    */
-  public M createMessageValue() {
-    Class<M> klass = getMessageValueClass();
-    if (klass == NullWritable.class) {
+  public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
+    return classes.getOutgoingMessageValueClass();
+  }
+
+  /**
+   * Create incoming message value
+   *
+   * @param <M> Message data
+   * @return Incoming message value
+   */
+  public <M extends Writable> M createIncomingMessageValue() {
+    return this.<M>createMessageValue(this.<M>getIncomingMessageValueClass());
+  }
+
+  /**
+   * Create outgoing message value
+   *
+   * @param <M> Message data
+   * @return Outgoing message value
+   */
+  public <M extends Writable> M createOutgoingMessageValue() {
+    return this.<M>createMessageValue(this.<M>getOutgoingMessageValueClass());
+  }
+
+  /**
+   * Create a message value
+   *
+   * @param <M> Message data
+   * @param messageClass Message class
+   * @return Instantiated message value
+   */
+  private <M extends Writable> M createMessageValue(
+      Class<? extends Writable> messageClass) {
+    if (messageClass == NullWritable.class) {
       return (M) NullWritable.get();
     } else {
-      try {
-        return klass.newInstance();
-      } catch (InstantiationException e) {
-        throw new IllegalArgumentException(
-            "createMessageValue: Failed to instantiate", e);
-      } catch (IllegalAccessException e) {
-        throw new IllegalArgumentException(
-            "createMessageValue: Illegally accessed", e);
-      }
+      return (M) ReflectionUtils.newInstance(messageClass);
     }
   }
 
@@ -792,10 +782,10 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * @param progressable Progressable for reporting progress
    * @return Instantiated partition
    */
-  public Partition<I, V, E, M> createPartition(
+  public Partition<I, V, E> createPartition(
       int id, Progressable progressable) {
-    Class<? extends Partition<I, V, E, M>> klass = classes.getPartitionClass();
-    Partition<I, V, E, M> partition = ReflectionUtils.newInstance(klass, this);
+    Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
+    Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
     partition.initialize(id, progressable);
     return partition;
   }
@@ -868,4 +858,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
       return new ExtendedByteArrayDataInput(buf, off, length);
     }
   }
+
+  /**
+   * Update Computation and Combiner class used
+   *
+   * @param superstepClasses SuperstepClasses
+   */
+  public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
+    classes.setComputationClass(superstepClasses.getComputationClass());
+    classes.setCombinerClass(superstepClasses.getCombinerClass());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
index 631b209..a983ee4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ConfigurableOutEdges.java
@@ -31,6 +31,6 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public abstract class ConfigurableOutEdges<I extends WritableComparable,
     E extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E,
-    Writable> implements OutEdges<I, E> {
+    extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E>
+    implements OutEdges<I, E> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 9e2d246..420bf93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -41,16 +41,15 @@ import org.apache.log4j.Logger;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class EdgeStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(EdgeStore.class);
   /** Service worker. */
-  private CentralizedServiceWorker<I, V, E, M> service;
+  private CentralizedServiceWorker<I, V, E> service;
   /** Giraph configuration. */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Progressable to report progress. */
   private Progressable progressable;
   /** Map used to temporarily store incoming edges. */
@@ -75,8 +74,8 @@ public class EdgeStore<I extends WritableComparable,
    * @param progressable Progressable
    */
   public EdgeStore(
-      CentralizedServiceWorker<I, V, E, M> service,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E> service,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       Progressable progressable) {
     this.service = service;
     this.configuration = configuration;
@@ -179,14 +178,14 @@ public class EdgeStore<I extends WritableComparable,
           public Void call() throws Exception {
             Integer partitionId;
             while ((partitionId = partitionIdQueue.poll()) != null) {
-              Partition<I, V, E, M> partition =
+              Partition<I, V, E> partition =
                   service.getPartitionStore().getPartition(partitionId);
               ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
                   transientEdges.remove(partitionId);
               for (I vertexId : partitionEdges.keySet()) {
                 OutEdges<I, E> outEdges = convertInputToComputeEdges(
                     partitionEdges.remove(vertexId));
-                Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+                Vertex<I, V, E> vertex = partition.getVertex(vertexId);
                 // If the source vertex doesn't exist, create it. Otherwise,
                 // just set the edges.
                 if (vertex == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
index 82486f4..ca32a9f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
@@ -34,14 +34,14 @@ import java.util.Iterator;
 public class MutableEdgesIterable<I extends WritableComparable,
     E extends Writable> implements Iterable<MutableEdge<I, E>> {
   /** Vertex that owns the out-edges. */
-  private Vertex<I, ?, E, ?> vertex;
+  private Vertex<I, ?, E> vertex;
 
   /**
    * Constructor.
    *
    * @param vertex Owning vertex
    */
-  public MutableEdgesIterable(Vertex<I, ?, E, ?> vertex) {
+  public MutableEdgesIterable(Vertex<I, ?, E> vertex) {
     this.vertex = vertex;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
index cd845d0..529234d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
@@ -73,7 +73,7 @@ public class MutableEdgesWrapper<I extends WritableComparable,
   public static <I extends WritableComparable, E extends Writable>
   MutableEdgesWrapper<I, E> wrap(
       OutEdges<I, E> edges,
-      ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
+      ImmutableClassesGiraphConfiguration<I, ?, E> conf) {
     MutableEdgesWrapper<I, E> wrapper = new MutableEdgesWrapper<I, E>(
         edges, conf.createAndInitializeOutEdges(edges.size()));
     return wrapper;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
new file mode 100644
index 0000000..180c5d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BasicComputation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Computation in which both incoming and outgoing message types are the same.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message type
+ */
+public abstract class BasicComputation<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Computation<I, V, E, M, M> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
new file mode 100644
index 0000000..84158df
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+/**
+ * Basic abstract class for writing a BSP application for computation.
+ *
+ * During the superstep there can be several instances of this class,
+ * each doing computation on one partition of the graph's vertices.
+ *
+ * Note that each thread will have its own {@link Computation},
+ * so accessing any data from this class is thread-safe.
+ * However, accessing global data (like data from {@link WorkerContext})
+ * is not thread-safe.
+ *
+ * Objects of this class only live for a single superstep.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M1> Incoming message type
+ * @param <M2> Outgoing message type
+ */
+public abstract class Computation<I extends WritableComparable,
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements WorkerAggregatorUsage {
+  /** Global graph state **/
+  private GraphState graphState;
+  /** Handles requests */
+  private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
+  /** Graph-wide BSP Mapper for this Computation */
+  private GraphTaskManager<I, V, E> graphTaskManager;
+  /** Worker aggregator usage */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+  /** Worker context */
+  private WorkerContext workerContext;
+
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param vertex   Vertex
+   * @param messages Messages that were sent to this vertex in the previous
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
+   */
+  public abstract void compute(Vertex<I, V, E> vertex,
+      Iterable<M1> messages) throws IOException;
+
+  /**
+   * Prepare for computation. This method is executed exactly once prior to
+   * {@link #compute(Vertex, Iterable)} being called for any of the vertices
+   * in the partition.
+   */
+  public void preSuperstep() {
+  }
+
+  /**
+   * Finish computation. This method is executed exactly once after computation
+   * for all vertices in the partition is complete.
+   */
+  public void postSuperstep() {
+  }
+
+  /**
+   * Initialize, called by infrastructure before the superstep starts.
+   * Shouldn't be called by user code.
+   *
+   * @param graphState Graph state
+   * @param workerClientRequestProcessor Processor for handling requests
+   * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
+   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerContext Worker context
+   */
+  public final void initialize(
+      GraphState graphState,
+      WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
+      GraphTaskManager<I, V, E> graphTaskManager,
+      WorkerAggregatorUsage workerAggregatorUsage,
+      WorkerContext workerContext) {
+    this.graphState = graphState;
+    this.workerClientRequestProcessor = workerClientRequestProcessor;
+    this.graphTaskManager = graphTaskManager;
+    this.workerAggregatorUsage = workerAggregatorUsage;
+    this.workerContext = workerContext;
+  }
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return graphState.getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getTotalNumVertices() {
+    return graphState.getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getTotalNumEdges() {
+    return graphState.getTotalNumEdges();
+  }
+
+  /**
+   * Send a message to a vertex id.
+   *
+   * @param id Vertex id to send the message to
+   * @param message Message data to send
+   */
+  public void sendMessage(I id, M2 message) {
+    if (workerClientRequestProcessor.sendMessageRequest(id, message)) {
+      graphTaskManager.notifySentMessages();
+    }
+  }
+
+  /**
+   * Send a message to all edges.
+   *
+   * @param vertex Vertex whose edges to send the message to.
+   * @param message Message sent to all edges.
+   */
+  public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+    for (Edge<I, E> edge : vertex.getEdges()) {
+      sendMessage(edge.getTargetVertexId(), message);
+    }
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   * @param edges Initial edges
+   */
+  public void addVertexRequest(I id, V value,
+      OutEdges<I, E> edges) throws IOException {
+    Vertex<I, V, E> vertex = getConf().createVertex();
+    vertex.initialize(id, value, edges);
+    workerClientRequestProcessor.addVertexRequest(vertex);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  public void addVertexRequest(I id, V value) throws IOException {
+    addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
+  }
+
+  /**
+   * Request to remove a vertex from the graph
+   * (applied just prior to the next superstep).
+   *
+   * @param vertexId Id of the vertex to be removed.
+   */
+  public void removeVertexRequest(I vertexId) throws IOException {
+    workerClientRequestProcessor.removeVertexRequest(vertexId);
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param edge Edge to add
+   */
+  public void addEdgeRequest(I sourceVertexId,
+      Edge<I, E> edge) throws IOException {
+    workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
+  }
+
+  /**
+   * Request to remove all edges from a given source vertex to a given target
+   * vertex (processed just prior to the next superstep).
+   *
+   * @param sourceVertexId Source vertex id
+   * @param targetVertexId Target vertex id
+   */
+  public void removeEdgesRequest(I sourceVertexId,
+      I targetVertexId) throws IOException {
+    workerClientRequestProcessor.removeEdgesRequest(
+        sourceVertexId, targetVertexId);
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return graphState.getContext();
+  }
+
+  /**
+   * Get the worker context
+   *
+   * @param <W> WorkerContext class
+   * @return WorkerContext context
+   */
+  @SuppressWarnings("unchecked")
+  public <W extends WorkerContext> W getWorkerContext() {
+    return (W) workerContext;
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.<A>getAggregatedValue(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 0fc5fdf..6fdcfb0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -28,7 +28,6 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.TimerDesc;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
@@ -65,10 +64,11 @@ import java.util.concurrent.Callable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
+ * @param <M1> Incoming message type
+ * @param <M2> Outgoing message type
  */
 public class ComputeCallable<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable>
+    E extends Writable, M1 extends Writable, M2 extends Writable>
     implements Callable<Collection<PartitionStats>> {
   /** Class logger */
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
@@ -76,21 +76,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   private static final Time TIME = SystemTime.get();
   /** Context */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph state (note that it is recreated in call() for locality) */
-  private GraphState<I, V, E, M> graphState;
+  /** Graph state */
+  private final GraphState graphState;
   /** Thread-safe queue of all partition ids */
   private final BlockingQueue<Integer> partitionIdQueue;
   /** Message store */
-  private final MessageStoreByPartition<I, M> messageStore;
+  private final MessageStoreByPartition<I, M1> messageStore;
   /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Worker (for NettyWorkerClientRequestProcessor) */
-  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
   /** Dump some progress every 30 seconds */
   private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
-  /** Sends the messages (unique per Callable) */
-  private WorkerClientRequestProcessor<I, V, E, M>
-  workerClientRequestProcessor;
   /** VertexWriter for this ComputeCallable */
   private SimpleVertexWriter<I, V, E> vertexWriter;
   /** Get the start time in nanos */
@@ -113,17 +110,16 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
    * @param serviceWorker Service worker
    */
   public ComputeCallable(
-      Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
-      MessageStoreByPartition<I, M> messageStore,
+      Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
+      MessageStoreByPartition<I, M1> messageStore,
       BlockingQueue<Integer> partitionIdQueue,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
     this.context = context;
     this.configuration = configuration;
     this.partitionIdQueue = partitionIdQueue;
     this.messageStore = messageStore;
     this.serviceWorker = serviceWorker;
-    // Will be replaced later in call() for locality
     this.graphState = graphState;
 
     SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
@@ -136,16 +132,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   @Override
   public Collection<PartitionStats> call() {
     // Thread initialization (for locality)
-    this.workerClientRequestProcessor =
-        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+    WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E>(
             context, configuration, serviceWorker);
     WorkerThreadAggregatorUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
-
-    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
-        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
-        context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
-        aggregatorUsage);
+    WorkerContext workerContext = serviceWorker.getWorkerContext();
 
     vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
 
@@ -156,10 +148,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
         break;
       }
 
-      Partition<I, V, E, M> partition =
+      Partition<I, V, E> partition =
           serviceWorker.getPartitionStore().getPartition(partitionId);
+
+      Computation<I, V, E, M1, M2> computation =
+          (Computation<I, V, E, M1, M2>) configuration.createComputation();
+      computation.initialize(graphState, workerClientRequestProcessor,
+          serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext);
+      computation.preSuperstep();
+
       try {
-        PartitionStats partitionStats = computePartition(partition);
+        PartitionStats partitionStats =
+            computePartition(computation, partition);
         partitionStatsList.add(partitionStats);
         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
         partitionStats.addMessagesSentCount(partitionMsgs);
@@ -177,6 +177,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       } finally {
         serviceWorker.getPartitionStore().putPartition(partition);
       }
+
+      computation.postSuperstep();
     }
 
     // Return VertexWriter after the usage
@@ -201,29 +203,19 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   /**
    * Compute a single partition
    *
+   * @param computation Computation to use
    * @param partition Partition to compute
    * @return Partition stats for this computed partition
    */
-  private PartitionStats computePartition(Partition<I, V, E, M> partition)
-    throws IOException, InterruptedException {
+  private PartitionStats computePartition(
+      Computation<I, V, E, M1, M2> computation,
+      Partition<I, V, E> partition) throws IOException, InterruptedException {
     PartitionStats partitionStats =
         new PartitionStats(partition.getId(), 0, 0, 0, 0);
     // Make sure this is thread-safe across runs
     synchronized (partition) {
-      // Prepare Partition context
-      WorkerContext workerContext =
-          graphState.getGraphTaskManager().getWorkerContext();
-      PartitionContext partitionContext = partition.getPartitionContext();
-      synchronized (workerContext) {
-        partitionContext.preSuperstep(workerContext);
-      }
-      graphState.setPartitionContext(partition.getPartitionContext());
-
-      for (Vertex<I, V, E, M> vertex : partition) {
-        // Make sure every vertex has this thread's
-        // graphState before computing
-        vertex.setGraphState(graphState);
-        Iterable<M> messages = messageStore.getVertexMessages(vertex.getId());
+      for (Vertex<I, V, E> vertex : partition) {
+        Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId());
         if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
           vertex.wakeUp();
         }
@@ -231,7 +223,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
           context.progress();
           TimerContext computeOneTimerContext = computeOneTimer.time();
           try {
-            vertex.compute(messages);
+            computation.compute(vertex, messages);
           } finally {
             computeOneTimerContext.stop();
           }
@@ -254,10 +246,6 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       }
 
       messageStore.clearPartition(partition.getId());
-
-      synchronized (workerContext) {
-        partitionContext.postSuperstep(workerContext);
-      }
     }
     return partitionStats;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
index 52df38d..748c3a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
@@ -32,29 +32,24 @@ import org.apache.log4j.Logger;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class DefaultVertexResolver<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements VertexResolver<I, V, E, M>,
-    ImmutableClassesGiraphConfigurable<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements VertexResolver<I, V, E> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
       DefaultVertexResolver.class);
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf = null;
-  /** Stored graph state */
-  private GraphState<I, V, E, M> graphState;
 
   /** Whether to create vertices when they receive a message */
   private boolean createVertexesOnMessages = true;
 
   @Override
-  public Vertex<I, V, E, M> resolve(
+  public Vertex<I, V, E> resolve(
       I vertexId,
-      Vertex<I, V, E, M> vertex,
-      VertexChanges<I, V, E, M> vertexChanges,
+      Vertex<I, V, E> vertex,
+      VertexChanges<I, V, E> vertexChanges,
       boolean hasMessages) {
     // This is the default vertex resolution algorithm
 
@@ -80,8 +75,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param vertex Vertex to remove edges from
    * @param vertexChanges contains list of edges to remove.
    */
-  protected void removeEdges(Vertex<I, V, E, M> vertex,
-                             VertexChanges<I, V, E, M> vertexChanges) {
+  protected void removeEdges(Vertex<I, V, E> vertex,
+                             VertexChanges<I, V, E> vertexChanges) {
     if (vertex == null) {
       return;
     }
@@ -101,9 +96,9 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param vertexChanges specifies if we should remove vertex
    * @return null if vertex should be removed, otherwise the vertex itself.
    */
-  protected Vertex<I, V, E, M> removeVertexIfDesired(
-      Vertex<I, V, E, M> vertex,
-      VertexChanges<I, V, E, M> vertexChanges) {
+  protected Vertex<I, V, E> removeVertexIfDesired(
+      Vertex<I, V, E> vertex,
+      VertexChanges<I, V, E> vertexChanges) {
     if (hasVertexRemovals(vertexChanges)) {
       vertex = null;
     }
@@ -120,18 +115,17 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param hasMessages true if this vertex received any messages
    * @return Vertex created or passed in, or null if no vertex should be added
    */
-  protected Vertex<I, V, E, M> addVertexIfDesired(
+  protected Vertex<I, V, E> addVertexIfDesired(
       I vertexId,
-      Vertex<I, V, E, M> vertex,
-      VertexChanges<I, V, E, M> vertexChanges,
+      Vertex<I, V, E> vertex,
+      VertexChanges<I, V, E> vertexChanges,
       boolean hasMessages) {
     if (vertex == null) {
       if (hasVertexAdditions(vertexChanges)) {
         vertex = vertexChanges.getAddedVertexList().get(0);
       } else if ((hasMessages && createVertexesOnMessages) ||
                  hasEdgeAdditions(vertexChanges)) {
-        vertex = conf.createVertex();
-        vertex.setGraphState(graphState);
+        vertex = getConf().createVertex();
         vertex.initialize(vertexId, getConf().createVertexValue());
       }
     } else if (hasVertexAdditions(vertexChanges)) {
@@ -148,8 +142,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param vertex Vertex to add edges to
    * @param vertexChanges contains edges to add
    */
-  protected void addEdges(Vertex<I, V, E, M> vertex,
-                          VertexChanges<I, V, E, M> vertexChanges) {
+  protected void addEdges(Vertex<I, V, E> vertex,
+                          VertexChanges<I, V, E> vertexChanges) {
     if (vertex == null) {
       return;
     }
@@ -166,7 +160,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param changes VertexChanges to check
    * @return true if changes contains vertex removal requests
    */
-  protected  boolean hasVertexRemovals(VertexChanges<I, V, E, M> changes) {
+  protected  boolean hasVertexRemovals(VertexChanges<I, V, E> changes) {
     return changes != null && changes.getRemovedVertexCount() > 0;
   }
 
@@ -176,7 +170,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param changes VertexChanges to check
    * @return true if changes contains vertex addition requests
    */
-  protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
+  protected boolean hasVertexAdditions(VertexChanges<I, V, E> changes) {
     return changes != null && !changes.getAddedVertexList().isEmpty();
   }
 
@@ -186,7 +180,7 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param changes VertexChanges to check
    * @return true if changes contains edge addition requests
    */
-  protected boolean hasEdgeAdditions(VertexChanges<I, V, E, M> changes) {
+  protected boolean hasEdgeAdditions(VertexChanges<I, V, E> changes) {
     return changes != null && !changes.getAddedEdgeList().isEmpty();
   }
 
@@ -196,28 +190,13 @@ public class DefaultVertexResolver<I extends WritableComparable,
    * @param changes VertexChanges to check
    * @return true if changes contains edge removal requests
    */
-  protected boolean hasEdgeRemovals(VertexChanges<I, V, E, M> changes) {
+  protected boolean hasEdgeRemovals(VertexChanges<I, V, E> changes) {
     return changes != null && !changes.getRemovedEdgeList().isEmpty();
   }
 
   @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
-    this.conf = conf;
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
     createVertexesOnMessages = conf.getResolverCreateVertexOnMessages();
   }
-
-  @Override
-  public GraphState<I, V, E, M> getGraphState() {
-    return graphState;
-  }
-
-  @Override
-  public void setGraphState(GraphState<I, V, E, M> graphState) {
-    this.graphState = graphState;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
index 47902d1..adbe9d3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexValueFactory.java
@@ -35,7 +35,7 @@ public class DefaultVertexValueFactory<V extends Writable>
 
   @Override
   public void initialize(
-      ImmutableClassesGiraphConfiguration<?, V, ?, ?> configuration) {
+      ImmutableClassesGiraphConfiguration<?, V, ?> configuration) {
     vertexValueClass = configuration.getVertexValueClass();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
index 3c2286d..19034ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
@@ -155,12 +155,11 @@ public class GiraphTransferRegulator {
    * @param <I> the vertex id type.
    * @param <V> the vertex value type.
    * @param <E> the edge value type.
-   * @param <M> the message value type.
    */
   public <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> void
+  E extends Writable> void
   incrementCounters(PartitionOwner partitionOwner,
-    Vertex<I, V, E, M> vertex) {
+    Vertex<I, V, E> vertex) {
     final int id = partitionOwner.getPartitionId();
     // vertex counts
     vertexAccumulator

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index 4181d24..c86a024 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -36,16 +36,15 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class GraphMapper<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> extends
+    E extends Writable> extends
     Mapper<Object, Object, Object, Object> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GraphMapper.class);
   /** Manage the framework-agnostic Giraph tasks for this job run */
-  private GraphTaskManager<I, V, E, M> graphTaskManager;
+  private GraphTaskManager<I, V, E> graphTaskManager;
 
   @Override
   public void setup(Context context)
@@ -56,7 +55,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
 
     // Execute all Giraph-related role(s) assigned to this compute node.
     // Roles can include "master," "worker," "zookeeper," or . . . ?
-    graphTaskManager = new GraphTaskManager<I, V, E, M>(context);
+    graphTaskManager = new GraphTaskManager<I, V, E>(context);
     graphTaskManager.setup(
       DistributedCache.getLocalCacheArchives(context.getConfiguration()));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
index 93ad5df..ca57008 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
@@ -15,26 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.giraph.graph;
 
-import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
  * Immutable global state of the graph.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public class GraphState<I extends WritableComparable, V extends Writable,
-E extends Writable, M extends Writable> {
+public class GraphState {
   /** Graph-wide superstep */
   private final long superstep;
   /** Graph-wide number of vertices */
@@ -43,15 +32,6 @@ E extends Writable, M extends Writable> {
   private final long numEdges;
   /** Graph-wide map context */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph-wide BSP Mapper for this Vertex */
-  private final GraphTaskManager<I, V, E, M> graphTaskManager;
-  /** Handles requests */
-  private final WorkerClientRequestProcessor<I, V, E, M>
-  workerClientRequestProcessor;
-  /** Worker aggregator usage */
-  private final WorkerAggregatorUsage workerAggregatorUsage;
-  /** Partition context */
-  private PartitionContext partitionContext;
 
   /**
    * Constructor
@@ -60,24 +40,14 @@ E extends Writable, M extends Writable> {
    * @param numVertices Current graph-wide vertices
    * @param numEdges Current graph-wide edges
    * @param context Context
-   * @param graphTaskManager GraphTaskManager for this compute node
-   * @param workerClientRequestProcessor Handles all communication
-   * @param workerAggregatorUsage Aggregator usage
    *
    */
-  public GraphState(
-      long superstep, long numVertices,
-      long numEdges, Mapper<?, ?, ?, ?>.Context context,
-      GraphTaskManager<I, V, E, M> graphTaskManager,
-      WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor,
-      WorkerAggregatorUsage workerAggregatorUsage) {
+  public GraphState(long superstep, long numVertices, long numEdges,
+      Mapper<?, ?, ?, ?>.Context context) {
     this.superstep = superstep;
     this.numVertices = numVertices;
     this.numEdges = numEdges;
     this.context = context;
-    this.graphTaskManager = graphTaskManager;
-    this.workerClientRequestProcessor = workerClientRequestProcessor;
-    this.workerAggregatorUsage = workerAggregatorUsage;
   }
 
   public long getSuperstep() {
@@ -96,33 +66,9 @@ E extends Writable, M extends Writable> {
     return context;
   }
 
-  public GraphTaskManager<I, V, E, M> getGraphTaskManager() {
-    return graphTaskManager;
-  }
-
-  public WorkerClientRequestProcessor<I, V, E, M>
-  getWorkerClientRequestProcessor() {
-    return workerClientRequestProcessor;
-  }
-
-  public WorkerAggregatorUsage getWorkerAggregatorUsage() {
-    return workerAggregatorUsage;
-  }
-
-  public void setPartitionContext(PartitionContext partitionContext) {
-    this.partitionContext = partitionContext;
-  }
-
-  public PartitionContext getPartitionContext() {
-    return partitionContext;
-  }
-
   @Override
   public String toString() {
     return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +
-        "numEdges=" + numEdges + ",context=" + context +
-        ",graphMapper=" + graphTaskManager +
-        ",workerClientRequestProcessor=" + workerClientRequestProcessor + ")";
-
+        "numEdges=" + numEdges + ",context=" + context + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
deleted file mode 100644
index 76cef43..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface specifying that the class can be configured with a GraphState.
- *
- * @param <I> Vertex ID object
- * @param <V> Vertex Value object
- * @param <E> Edge object
- * @param <M> Message object
- */
-public interface GraphStateAware<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Set the graph state.
-   *
-   * @param graphState Graph state saved.
-   */
-  void setGraphState(GraphState<I, V, E, M> graphState);
-
-  /**
-   * Get the graph state stored.
-   *
-   * @return GraphState stored.
-   */
-  GraphState<I, V, E, M> getGraphState();
-}