You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:03 UTC

[07/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 2c5f2f7..762802b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -41,8 +41,8 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
    * @return Casted configuration
    */
   @Override
-  public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
-    return (ImmutableClassesGiraphConfiguration<I, ?, E, ?>) super.getConf();
+  public ImmutableClassesGiraphConfiguration<I, ?, E> getConf() {
+    return (ImmutableClassesGiraphConfiguration<I, ?, E>) super.getConf();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0280c58..6b4642c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -17,7 +17,6 @@
  */
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -34,10 +33,22 @@ import java.io.IOException;
 @SuppressWarnings("unchecked")
 public class ByteArrayVertexIdMessages<I extends WritableComparable,
     M extends Writable> extends ByteArrayVertexIdData<I, M> {
+  /** Message value class */
+  private Class<M> messageValueClass;
   /** Add the message size to the stream? (Depends on the message store) */
   private boolean useMessageSizeEncoding = false;
 
   /**
+   * Constructor
+   *
+   * @param messageValueClass Class for messages
+   */
+  public ByteArrayVertexIdMessages(
+      Class<? extends Writable> messageValueClass) {
+    this.messageValueClass = (Class<M>) messageValueClass;
+  }
+
+  /**
    * Set whether message sizes should be encoded.  This should only be a
    * possibility when not combining.  When combining, all messages need to be
    * deserializd right away, so this won't help.
@@ -50,20 +61,9 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
     }
   }
 
-  /**
-   * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
-   * to generate message objects.
-   *
-   * @return Casted configuration
-   */
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, ?, ?, M> getConf() {
-    return (ImmutableClassesGiraphConfiguration<I, ?, ?, M>) super.getConf();
-  }
-
   @Override
   public M createData() {
-    return getConf().createMessageValue();
+    return ReflectionUtils.newInstance(messageValueClass);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 6016ba4..d8b121b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.OutEdges;
@@ -142,14 +143,14 @@ public final class ConfigurationUtils {
     performSanityCheck(cmd);
 
     // Args are OK; attempt to populate the GiraphConfiguration with them.
-    final String vertexClassName = args[0];
+    final String computationClassName = args[0];
     final int workers = Integer.parseInt(cmd.getOptionValue('w'));
-    populateGiraphConfiguration(giraphConf, cmd, vertexClassName, workers);
+    populateGiraphConfiguration(giraphConf, cmd, computationClassName, workers);
 
     // validate generic parameters chosen are correct or
     // throw IllegalArgumentException, halting execution.
     @SuppressWarnings("rawtypes")
-    GiraphConfigurationValidator<?, ?, ?, ?> gtv =
+    GiraphConfigurationValidator<?, ?, ?, ?, ?> gtv =
       new GiraphConfigurationValidator(giraphConf);
     gtv.validateConfiguration();
 
@@ -200,15 +201,17 @@ public final class ConfigurationUtils {
    * should be captured here.
    * @param giraphConfiguration config for this job run
    * @param cmd parsed command line options to store in giraphConfiguration
-   * @param vertexClassName the vertex class (application) to run in this job.
+   * @param computationClassName the computation class (application) to run in
+   *                             this job.
    * @param workers the number of worker tasks for this job run.
    */
   private static void populateGiraphConfiguration(final GiraphConfiguration
-    giraphConfiguration, final CommandLine cmd, final String vertexClassName,
+    giraphConfiguration, final CommandLine cmd,
+      final String computationClassName,
     final int workers) throws ClassNotFoundException, IOException {
     giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
-    giraphConfiguration.setVertexClass(
-        (Class<? extends Vertex>) Class.forName(vertexClassName));
+    giraphConfiguration.setComputationClass(
+        (Class<? extends Computation>) Class.forName(computationClassName));
     if (cmd.hasOption("c")) {
       giraphConfiguration.setCombinerClass(
           (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
index 65d99db..6ca488c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InMemoryVertexInputFormat.java
@@ -76,9 +76,9 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
    */
   private class InMemoryVertexReader extends VertexReader<I, V, E> {
     /** The iterator */
-    private Iterator<Vertex<I, V, E, ?>> vertexIterator;
+    private Iterator<Vertex<I, V, E>> vertexIterator;
     /** Current vertex */
-    private Vertex<I, V, E, ?> currentVertex;
+    private Vertex<I, V, E> currentVertex;
 
     @Override
     public void initialize(InputSplit inputSplit,
@@ -96,7 +96,7 @@ public class InMemoryVertexInputFormat<I extends WritableComparable,
     }
 
     @Override
-    public Vertex<I, V, E, ?> getCurrentVertex() {
+    public Vertex<I, V, E> getCurrentVertex() {
       return currentVertex;
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index be2d2a9..b4920e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -101,7 +101,7 @@ public class InternalVertexRunner {
     File tmpDir = null;
     try {
       // Prepare input file, output folder and temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getVertexClass());
+      tmpDir = FileUtils.createTestDir(conf.getComputationClass());
 
       File vertexInputFile = null;
       File edgeInputFile = null;
@@ -137,7 +137,7 @@ public class InternalVertexRunner {
       GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
 
       // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName());
+      GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
 
       Job internalJob = job.getInternalJob();
       if (conf.hasVertexInputFormat()) {
@@ -199,7 +199,6 @@ public class InternalVertexRunner {
    * @param <I> Vertex ID
    * @param <V> Vertex Value
    * @param <E> Edge Value
-   * @param <M> Message Value
    * @param conf GiraphClasses specifying which types to use
    * @param graph input graph
    * @return iterable output data
@@ -207,14 +206,13 @@ public class InternalVertexRunner {
    */
   public static <I extends WritableComparable,
     V extends Writable,
-    E extends Writable,
-    M extends Writable> TestGraph<I, V, E, M> run(
+    E extends Writable> TestGraph<I, V, E> run(
       GiraphConfiguration conf,
-      TestGraph<I, V, E, M> graph) throws Exception {
+      TestGraph<I, V, E> graph) throws Exception {
     File tmpDir = null;
     try {
       // Prepare temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getVertexClass());
+      tmpDir = FileUtils.createTestDir(conf.getComputationClass());
 
       File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
       File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
@@ -223,7 +221,7 @@ public class InternalVertexRunner {
       conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
 
       // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getVertexClass().getName());
+      GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
 
       InMemoryVertexInputFormat.setGraph(graph);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index d70eecb..96352bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -168,6 +168,26 @@ public class ReflectionUtils {
   }
 
   /**
+   * Instantiate a class, wrap exceptions
+   *
+   * @param theClass Class to instantiate
+   * @param <T> Type to instantiate
+   * @return Newly instantiated object
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> theClass) {
+    try {
+      return theClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(
+          "newInstance: Couldn't instantiate " + theClass.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(
+          "newInstance: Illegal access " + theClass.getName(), e);
+    }
+  }
+
+  /**
    * Instantiate classes that are ImmutableClassesGiraphConfigurable
    *
    * @param theClass Class to instantiate

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 3577a9e..6e46a76 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -41,17 +41,15 @@ import java.util.Map.Entry;
  * @param <I> Vertex index type
  * @param <V> Vertex type
  * @param <E> Edge type
- * @param <M> Message type
  */
 public class TestGraph<I extends WritableComparable,
                        V extends Writable,
-                       E extends Writable,
-                       M extends Writable>
-                       implements Iterable<Vertex<I, V, E, M>> {
+                       E extends Writable>
+                       implements Iterable<Vertex<I, V, E>> {
   /** The vertex values */
-  private final HashMap<I, Vertex<I, V, E, M>> vertices = Maps.newHashMap();
+  private final HashMap<I, Vertex<I, V, E>> vertices = Maps.newHashMap();
   /** The configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
   /**
    * Constructor requiring classes
@@ -62,7 +60,7 @@ public class TestGraph<I extends WritableComparable,
     this.conf = new ImmutableClassesGiraphConfiguration(conf);
   }
 
-  public HashMap<I, Vertex<I, V, E, M>> getVertices() {
+  public HashMap<I, Vertex<I, V, E>> getVertices() {
     return vertices;
   }
 
@@ -82,9 +80,9 @@ public class TestGraph<I extends WritableComparable,
    * @param edges all edges
    * @return this
    */
-  public TestGraph<I, V, E, M> addVertex(I id, V value,
+  public TestGraph<I, V, E> addVertex(I id, V value,
                                          Entry<I, E>... edges) {
-    Vertex<I, V, E, M> v = makeVertex(id, value, edges);
+    Vertex<I, V, E> v = makeVertex(id, value, edges);
     vertices.put(id, v);
     return this;
   }
@@ -96,9 +94,9 @@ public class TestGraph<I extends WritableComparable,
    * @param edgePair The edge
    * @return this
    */
-  public TestGraph<I, V, E, M> addEdge(I vertexId, Entry<I, E> edgePair) {
+  public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
     if (!vertices.containsKey(vertexId)) {
-      Vertex<I, V, E, M> v = conf.createVertex();
+      Vertex<I, V, E> v = conf.createVertex();
       v.initialize(vertexId, conf.createVertexValue());
       vertices.put(vertexId, v);
     }
@@ -116,9 +114,9 @@ public class TestGraph<I extends WritableComparable,
    * @param edgeValue Edge value
    * @return this
    */
-  public TestGraph<I, V, E, M> addEdge(I vertexId, I toVertex, E edgeValue) {
+  public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) {
     if (!vertices.containsKey(vertexId)) {
-      Vertex<I, V, E, M> v = conf.createVertex();
+      Vertex<I, V, E> v = conf.createVertex();
       v.initialize(vertexId, conf.createVertexValue());
       vertices.put(vertexId, v);
     }
@@ -140,7 +138,7 @@ public class TestGraph<I extends WritableComparable,
    *
    * @return the iterator
    */
-  public Iterator<Vertex<I, V, E, M>> iterator() {
+  public Iterator<Vertex<I, V, E>> iterator() {
     return vertices.values().iterator();
   }
 
@@ -150,7 +148,7 @@ public class TestGraph<I extends WritableComparable,
    * @param id the id
    * @return the value
    */
-  public Vertex<I, V, E, M> getVertex(I id) {
+  public Vertex<I, V, E> getVertex(I id) {
     return vertices.get(id);
   }
 
@@ -177,10 +175,10 @@ public class TestGraph<I extends WritableComparable,
    * @param edges edges to other vertices
    * @return a new vertex
    */
-  protected Vertex<I, V, E, M> makeVertex(I id, V value,
+  protected Vertex<I, V, E> makeVertex(I id, V value,
       Entry<I, E>... edges) {
     @SuppressWarnings("unchecked")
-    Vertex<I, V, E, M> vertex = conf.createVertex();
+    Vertex<I, V, E> vertex = conf.createVertex();
     vertex.initialize(id, value, createEdges(edges));
     return vertex;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
index 0c9ee07..bad11d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -42,7 +42,7 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
    */
   public VertexIdIterator(
       ExtendedDataOutput extendedDataOutput,
-      ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration) {
+      ImmutableClassesGiraphConfiguration<I, ?, ?> configuration) {
     extendedDataInput = configuration.createExtendedDataInput(
         extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index c607ca3..c78d717 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -56,14 +56,16 @@ public class WritableUtils {
    * Read fields from byteArray to a Writeable object.
    *
    * @param byteArray Byte array to find the fields in.
-   * @param writableObject Object to fill in the fields.
+   * @param writableObjects Objects to fill in the fields.
    */
   public static void readFieldsFromByteArray(
-      byte[] byteArray, Writable writableObject) {
+      byte[] byteArray, Writable... writableObjects) {
     DataInputStream inputStream =
       new DataInputStream(new ByteArrayInputStream(byteArray));
     try {
-      writableObject.readFields(inputStream);
+      for (Writable writableObject : writableObjects) {
+        writableObject.readFields(inputStream);
+      }
     } catch (IOException e) {
       throw new IllegalStateException(
           "readFieldsFromByteArray: IOException", e);
@@ -77,16 +79,16 @@ public class WritableUtils {
    * @param zkPath Path of znode.
    * @param watch Add a watch?
    * @param stat Stat of znode if desired.
-   * @param writableObject Object to read into.
+   * @param writableObjects Objects to read into.
    */
   public static void readFieldsFromZnode(ZooKeeperExt zkExt,
                                          String zkPath,
                                          boolean watch,
                                          Stat stat,
-                                         Writable writableObject) {
+                                         Writable... writableObjects) {
     try {
       byte[] zkData = zkExt.getData(zkPath, false, stat);
-      readFieldsFromByteArray(zkData, writableObject);
+      readFieldsFromByteArray(zkData, writableObjects);
     } catch (KeeperException e) {
       throw new IllegalStateException(
         "readFieldsFromZnode: KeeperException on " + zkPath, e);
@@ -99,15 +101,17 @@ public class WritableUtils {
   /**
    * Write object to a byte array.
    *
-   * @param writableObject Object to write from.
+   * @param writableObjects Objects to write from.
    * @return Byte array with serialized object.
    */
-  public static byte[] writeToByteArray(Writable writableObject) {
+  public static byte[] writeToByteArray(Writable... writableObjects) {
     ByteArrayOutputStream outputStream =
         new ByteArrayOutputStream();
     DataOutput output = new DataOutputStream(outputStream);
     try {
-      writableObject.write(output);
+      for (Writable writableObject : writableObjects) {
+        writableObject.write(output);
+      }
     } catch (IOException e) {
       throw new IllegalStateException(
           "writeToByteArray: IOStateException", e);
@@ -189,15 +193,15 @@ public class WritableUtils {
    * @param zkExt ZooKeeper instance.
    * @param zkPath Path of znode.
    * @param version Version of the write.
-   * @param writableObject Object to write from.
+   * @param writableObjects Objects to write from.
    * @return Path and stat information of the znode.
    */
   public static PathStat writeToZnode(ZooKeeperExt zkExt,
                                       String zkPath,
                                       int version,
-                                      Writable writableObject) {
+                                      Writable... writableObjects) {
     try {
-      byte[] byteArray = writeToByteArray(writableObject);
+      byte[] byteArray = writeToByteArray(writableObjects);
       return zkExt.createOrSetExt(zkPath,
           byteArray,
           Ids.OPEN_ACL_UNSAFE,
@@ -341,15 +345,14 @@ public class WritableUtils {
    * @param <I> Vertex id
    * @param <V> Vertex value
    * @param <E> Edge value
-   * @param <M> Message value
    * @return Byte array with serialized object.
    */
   public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
-      Vertex<I, V, E, M> vertex,
+      E extends Writable> byte[] writeVertexToByteArray(
+      Vertex<I, V, E> vertex,
       byte[] buffer,
       boolean unsafe,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     ExtendedDataOutput extendedDataOutput;
     if (unsafe) {
       extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
@@ -378,14 +381,13 @@ public class WritableUtils {
    * @param <I> Vertex id
    * @param <V> Vertex value
    * @param <E> Edge value
-   * @param <M> Message value
    * @return Byte array with serialized object.
    */
   public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
-      Vertex<I, V, E, M> vertex,
+      E extends Writable> byte[] writeVertexToByteArray(
+      Vertex<I, V, E> vertex,
       boolean unsafe,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     return writeVertexToByteArray(vertex, null, unsafe, conf);
   }
 
@@ -400,16 +402,14 @@ public class WritableUtils {
   * @param <I> Vertex id
   * @param <V> Vertex value
   * @param <E> Edge value
-  * @param <M> Message value
   * @param conf Configuration
   */
   public static <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> void
-  reinitializeVertexFromByteArray(
+  E extends Writable> void reinitializeVertexFromByteArray(
       byte[] byteArray,
-      Vertex<I, V, E, M> vertex,
+      Vertex<I, V, E> vertex,
       boolean unsafe,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     ExtendedDataInput extendedDataInput;
     if (unsafe) {
       extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
@@ -465,15 +465,14 @@ public class WritableUtils {
    * @param <I> Vertex id
    * @param <V> Vertex value
    * @param <E> Edge value
-   * @param <M> Message value
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
   public static <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> void reinitializeVertexFromDataInput(
+  E extends Writable> void reinitializeVertexFromDataInput(
       DataInput input,
-      Vertex<I, V, E, M> vertex,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+      Vertex<I, V, E> vertex,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf)
     throws IOException {
     vertex.getId().readFields(input);
     vertex.getValue().readFields(input);
@@ -493,17 +492,16 @@ public class WritableUtils {
    * @param <I> Vertex id
    * @param <V> Vertex value
    * @param <E> Edge value
-   * @param <M> Message value
    * @return The vertex
    * @throws IOException
    */
   public static <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> Vertex<I, V, E, M>
+  E extends Writable> Vertex<I, V, E>
   readVertexFromDataInput(
       DataInput input,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+      ImmutableClassesGiraphConfiguration<I, V, E> conf)
     throws IOException {
-    Vertex<I, V, E, M> vertex = conf.createVertex();
+    Vertex<I, V, E> vertex = conf.createVertex();
     I id = conf.createVertexId();
     V value = conf.createVertexValue();
     OutEdges<I, E> edges = conf.createOutEdges();
@@ -521,19 +519,56 @@ public class WritableUtils {
    * @param <I> Vertex id
    * @param <V> Vertex value
    * @param <E> Edge value
-   * @param <M> Message value
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
   public static <I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> void writeVertexToDataOutput(
+  E extends Writable> void writeVertexToDataOutput(
       DataOutput output,
-      Vertex<I, V, E, M> vertex,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+      Vertex<I, V, E> vertex,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf)
     throws IOException {
     vertex.getId().write(output);
     vertex.getValue().write(output);
     ((OutEdges<I, E>) vertex.getEdges()).write(output);
     output.writeBoolean(vertex.isHalted());
   }
+
+  /**
+   * Write class to data output. Also handles the case when class is null.
+   *
+   * @param clazz Class
+   * @param output Data output
+   * @param <T> Class type
+   */
+  public static <T> void writeClass(Class<T> clazz,
+      DataOutput output) throws IOException {
+    output.writeBoolean(clazz != null);
+    if (clazz != null) {
+      output.writeUTF(clazz.getName());
+    }
+  }
+
+  /**
+   * Read class from data input.
+   * Matches {@link #writeClass(Class, DataOutput)}.
+   *
+   * @param input Data input
+   * @param <T> Class type
+   * @return Class, or null if null was written
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> Class<T> readClass(DataInput input) throws IOException {
+    if (input.readBoolean()) {
+      String className = input.readUTF();
+      try {
+        return (Class<T>) Class.forName(className);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("readClass: No class found " +
+            className);
+      }
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 03a4876..8b5e39a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -35,7 +35,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.InputSplitPaths;
@@ -45,6 +44,7 @@ import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.master.SuperstepClasses;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphTimer;
 import org.apache.giraph.metrics.GiraphTimerContext;
@@ -108,13 +108,12 @@ import java.util.concurrent.TimeUnit;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class BspServiceWorker<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends BspService<I, V, E, M>
-    implements CentralizedServiceWorker<I, V, E, M>,
+    V extends Writable, E extends Writable>
+    extends BspService<I, V, E>
+    implements CentralizedServiceWorker<I, V, E>,
     ResetSuperstepMetricsObserver {
   /** Name of gauge for time spent waiting on other workers */
   public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
@@ -125,12 +124,12 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Worker info */
   private final WorkerInfo workerInfo;
   /** Worker graph partitioner */
-  private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+  private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
 
   /** IPC Client */
-  private final WorkerClient<I, V, E, M> workerClient;
+  private final WorkerClient<I, V, E> workerClient;
   /** IPC Server */
-  private final WorkerServer<I, V, E, M> workerServer;
+  private final WorkerServer<I, V, E> workerServer;
   /** Request processor for aggregator requests */
   private final WorkerAggregatorRequestProcessor
   workerAggregatorRequestProcessor;
@@ -173,27 +172,28 @@ public class BspServiceWorker<I extends WritableComparable,
     String serverPortList,
     int sessionMsecTimeout,
     Mapper<?, ?, ?, ?>.Context context,
-    GraphTaskManager<I, V, E, M> graphTaskManager)
+    GraphTaskManager<I, V, E> graphTaskManager)
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
-    ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
+    ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
     workerInfo = new WorkerInfo();
-    workerServer = new NettyWorkerServer<I, V, E, M>(conf, this, context);
+    workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
     workerInfo.setInetSocketAddress(workerServer.getMyAddress());
     workerInfo.setTaskId(getTaskPartition());
-    workerClient = new NettyWorkerClient<I, V, E, M>(context, conf, this);
+    workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
 
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
 
-    workerContext = conf.createWorkerContext(null);
-
     aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
 
+    workerContext = conf.createWorkerContext();
+    workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+
     superstepOutput = conf.createSuperstepOutput(context);
 
     if (conf.isJMapHistogramDumpEnabled()) {
@@ -223,7 +223,7 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   @Override
-  public WorkerClient<I, V, E, M> getWorkerClient() {
+  public WorkerClient<I, V, E> getWorkerClient() {
     return workerClient;
   }
 
@@ -285,7 +285,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
 
   /**
-   * Load the vertices from the user-defined {@link VertexReader}
+   * Load the vertices from the user-defined
+   * {@link org.apache.giraph.io.VertexReader}
    *
    * @return Count of vertices and edges loaded
    */
@@ -295,10 +296,6 @@ public class BspServiceWorker<I extends WritableComparable,
         getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
             false, false, true);
 
-    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
-        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
-        null, null);
-
     InputSplitPathOrganizer splitOrganizer =
         new InputSplitPathOrganizer(getZkExt(),
             inputSplitPathList, getWorkerInfo().getHostname(),
@@ -310,11 +307,10 @@ public class BspServiceWorker<I extends WritableComparable,
         BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
         BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
 
-    VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
-        new VertexInputSplitsCallableFactory<I, V, E, M>(
+    VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+        new VertexInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedVertexInputFormat(),
             getContext(),
-            graphState,
             getConfiguration(),
             this,
             splitsHandler,
@@ -324,7 +320,8 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   /**
-   * Load the edges from the user-defined {@link EdgeReader}.
+   * Load the edges from the user-defined
+   * {@link org.apache.giraph.io.EdgeReader}.
    *
    * @return Number of edges loaded
    */
@@ -333,10 +330,6 @@ public class BspServiceWorker<I extends WritableComparable,
         getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
             false, false, true);
 
-    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
-        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
-        null, null);
-
     InputSplitPathOrganizer splitOrganizer =
         new InputSplitPathOrganizer(getZkExt(),
             inputSplitPathList, getWorkerInfo().getHostname(),
@@ -348,11 +341,10 @@ public class BspServiceWorker<I extends WritableComparable,
         BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
         BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
 
-    EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
-        new EdgeInputSplitsCallableFactory<I, V, E, M>(
+    EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+        new EdgeInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedEdgeInputFormat(),
             getContext(),
-            graphState,
             getConfiguration(),
             this,
             splitsHandler,
@@ -483,11 +475,8 @@ public class BspServiceWorker<I extends WritableComparable,
     }
 
     // Add the partitions that this worker owns
-    GraphState<I, V, E, M> graphState =
-        new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
-            getContext(), getGraphTaskManager(), null, null);
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
-        startSuperstep(graphState);
+        startSuperstep();
     workerGraphPartitioner.updatePartitionOwners(
         getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 
@@ -552,7 +541,7 @@ else[HADOOP_NON_SECURE]*/
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
           !getPartitionStore().hasPartition(
               partitionOwner.getPartitionId())) {
-        Partition<I, V, E, M> partition =
+        Partition<I, V, E> partition =
             getConfiguration().createPartition(
                 partitionOwner.getPartitionId(), getContext());
         getPartitionStore().addPartition(partition);
@@ -569,7 +558,7 @@ else[HADOOP_NON_SECURE]*/
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E, M> partition =
+      Partition<I, V, E> partition =
           getPartitionStore().getPartition(partitionId);
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
@@ -583,7 +572,7 @@ else[HADOOP_NON_SECURE]*/
     workerGraphPartitioner.finalizePartitionStats(
         partitionStatsList, getPartitionStore());
 
-    return finishSuperstep(graphState, partitionStatsList);
+    return finishSuperstep(partitionStatsList);
   }
 
   /**
@@ -666,8 +655,7 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Collection<? extends PartitionOwner> startSuperstep(
-      GraphState<I, V, E, M> graphState) {
+  public Collection<? extends PartitionOwner> startSuperstep() {
     // Algorithm:
     // 1. Communication service will combine message from previous
     //    superstep
@@ -675,7 +663,7 @@ else[HADOOP_NON_SECURE]*/
     // 3. Wait until the partition assignment is complete and get it
     // 4. Get the aggregator values from the previous superstep
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      workerServer.prepareSuperstep(graphState);
+      workerServer.prepareSuperstep();
     }
 
     registerHealth(getSuperstep());
@@ -727,7 +715,6 @@ else[HADOOP_NON_SECURE]*/
 
   @Override
   public FinishedSuperstepStats finishSuperstep(
-      GraphState<I, V, E, M> graphState,
       List<PartitionStats> partitionStatsList) {
     // This barrier blocks until success (or the master signals it to
     // restart).
@@ -740,10 +727,10 @@ else[HADOOP_NON_SECURE]*/
     // 4. Report the statistics (vertices, edges, messages, etc.)
     //    of this worker
     // 5. Let the master know it is finished.
-    // 6. Wait for the master's global stats, and check if done
+    // 6. Wait for the master's superstep info, and check if done
     waitForRequestsToFinish();
 
-    graphState.getGraphTaskManager().notifyFinishedCommunication();
+    getGraphTaskManager().notifyFinishedCommunication();
 
     long workerSentMessages = 0;
     long localVertices = 0;
@@ -753,7 +740,7 @@ else[HADOOP_NON_SECURE]*/
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      postSuperstepCallbacks(graphState);
+      postSuperstepCallbacks();
     }
 
     aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -779,17 +766,21 @@ else[HADOOP_NON_SECURE]*/
     waitForOtherWorkers(superstepFinishedNode);
 
     GlobalStats globalStats = new GlobalStats();
+    SuperstepClasses superstepClasses = new SuperstepClasses();
     WritableUtils.readFieldsFromZnode(
-        getZkExt(), superstepFinishedNode, false, null, globalStats);
+        getZkExt(), superstepFinishedNode, false, null, globalStats,
+        superstepClasses);
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
-          " with global stats " + globalStats);
+          " with global stats " + globalStats + " and classes " +
+          superstepClasses);
     }
     incrCachedSuperstep();
     getContext().setStatus("finishSuperstep: (all workers done) " +
         getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
+    getConfiguration().updateSuperstepClasses(superstepClasses);
 
     return new FinishedSuperstepStats(
         localVertices,
@@ -801,18 +792,15 @@ else[HADOOP_NON_SECURE]*/
 
   /**
    * Handle post-superstep callbacks
-   *
-   * @param graphState GraphState
    */
-  private void postSuperstepCallbacks(GraphState<I, V, E, M> graphState) {
-    getWorkerContext().setGraphState(graphState);
+  private void postSuperstepCallbacks() {
     GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
     getWorkerContext().postSuperstep();
     timerContext.stop();
     getContext().progress();
 
     for (WorkerObserver obs : getWorkerObservers()) {
-      obs.postSuperstep(graphState.getSuperstep());
+      obs.postSuperstep(getSuperstep());
       getContext().progress();
     }
   }
@@ -943,9 +931,7 @@ else[HADOOP_NON_SECURE]*/
           public Void call() throws Exception {
             VertexWriter<I, V, E> vertexWriter =
                 vertexOutputFormat.createVertexWriter(getContext());
-            vertexWriter.setConf(
-                (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
-                    getConfiguration());
+            vertexWriter.setConf(getConfiguration());
             vertexWriter.initialize(getContext());
             long verticesWritten = 0;
             long nextPrintVertices = 0;
@@ -953,9 +939,9 @@ else[HADOOP_NON_SECURE]*/
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
             for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-              Partition<I, V, E, M> partition =
+              Partition<I, V, E> partition =
                   getPartitionStore().getPartition(partitionId);
-              for (Vertex<I, V, E, M> vertex : partition) {
+              for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
                 ++verticesWritten;
 
@@ -1104,7 +1090,7 @@ else[HADOOP_NON_SECURE]*/
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E, M> partition =
+      Partition<I, V, E> partition =
           getPartitionStore().getPartition(partitionId);
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
@@ -1209,7 +1195,7 @@ else[HADOOP_NON_SECURE]*/
                 " not found!");
           }
           metadataStream.close();
-          Partition<I, V, E, M> partition =
+          Partition<I, V, E> partition =
               getConfiguration().createPartition(partitionId, getContext());
           DataInputStream partitionsStream =
               getFs().open(new Path(partitionsFile));
@@ -1250,18 +1236,21 @@ else[HADOOP_NON_SECURE]*/
           " total.");
     }
 
-    // Load global statistics
-    GlobalStats globalStats = null;
+    // Load global stats and superstep classes
+    GlobalStats globalStats = new GlobalStats();
+    SuperstepClasses superstepClasses = new SuperstepClasses();
     String finalizedCheckpointPath =
         getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
     try {
       DataInputStream finalizedStream =
           getFs().open(new Path(finalizedCheckpointPath));
-      globalStats = new GlobalStats();
       globalStats.readFields(finalizedStream);
+      superstepClasses.readFields(finalizedStream);
+      getConfiguration().updateSuperstepClasses(superstepClasses);
     } catch (IOException e) {
       throw new IllegalStateException(
-          "loadCheckpoint: Failed to load global statistics", e);
+          "loadCheckpoint: Failed to load global stats and superstep classes",
+          e);
     }
 
     // Communication service needs to setup the connections prior to
@@ -1287,13 +1276,13 @@ else[HADOOP_NON_SECURE]*/
         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
             workerPartitionMap.entrySet());
     Collections.shuffle(randomEntryList);
-    WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
-        new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+    WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
             getConfiguration(), this);
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
       randomEntryList) {
       for (Integer partitionId : workerPartitionList.getValue()) {
-        Partition<I, V, E, M> partition =
+        Partition<I, V, E> partition =
             getPartitionStore().removePartition(partitionId);
         if (partition == null) {
           throw new IllegalStateException(
@@ -1467,7 +1456,7 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public PartitionStore<I, V, E, M> getPartitionStore() {
+  public PartitionStore<I, V, E> getPartitionStore() {
     return getServerData().getPartitionStore();
   }
 
@@ -1493,7 +1482,7 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public ServerData<I, V, E, M> getServerData() {
+  public ServerData<I, V, E> getServerData() {
     return workerServer.getServerData();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 351a114..78cdd8e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -20,7 +20,6 @@ package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
@@ -48,11 +47,10 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class EdgeInputSplitsCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends InputSplitsCallable<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends InputSplitsCallable<I, V, E> {
   /** How often to update metrics and print info */
   public static final int EDGES_UPDATE_PERIOD = 1000000;
   /** How often to update filtered metrics */
@@ -80,7 +78,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    *
    * @param edgeInputFormat Edge input format
    * @param context Context
-   * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
@@ -89,13 +86,12 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   public EdgeInputSplitsCallable(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt)  {
-    super(context, graphState, configuration, bspServiceWorker,
-        splitsHandler, zooKeeperExt);
+    super(context, configuration, bspServiceWorker, splitsHandler,
+        zooKeeperExt);
     this.edgeInputFormat = edgeInputFormat;
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
@@ -116,20 +112,18 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * maximum number of edges to be read from an input split.
    *
    * @param inputSplit Input split to process with edge reader
-   * @param graphState Current graph state
    * @return Edges loaded from this input split
    * @throws IOException
    * @throws InterruptedException
    */
   @Override
   protected VertexEdgeCount readInputSplit(
-      InputSplit inputSplit,
-      GraphState<I, V, E, M> graphState) throws IOException,
+      InputSplit inputSplit) throws IOException,
       InterruptedException {
     EdgeReader<I, E> edgeReader =
         edgeInputFormat.createEdgeReader(inputSplit, context);
     edgeReader.setConf(
-        (ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>)
+        (ImmutableClassesGiraphConfiguration<I, Writable, E>)
             configuration);
     edgeReader.initialize(inputSplit, context);
 
@@ -166,8 +160,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         continue;
       }
 
-      graphState.getWorkerClientRequestProcessor().sendEdgeRequest(sourceId,
-          readerEdge);
+      workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
       context.progress(); // do this before potential data transfer
 
       // Update status every EDGES_UPDATE_PERIOD edges

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 33fb515..f68ac93 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.utils.CallableFactory;
@@ -34,21 +33,18 @@ import org.apache.hadoop.mapreduce.Mapper;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     implements CallableFactory<VertexEdgeCount> {
   /** Edge input format */
   private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph state. */
-  private final GraphState<I, V, E, M> graphState;
   /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** {@link BspServiceWorker} we're running on. */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
   private final InputSplitsHandler splitsHandler;
   /** {@link ZooKeeperExt} for this worker. */
@@ -59,7 +55,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
    *
    * @param edgeInputFormat Edge input format
    * @param context Mapper context
-   * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
@@ -68,14 +63,12 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   public EdgeInputSplitsCallableFactory(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
     this.edgeInputFormat = edgeInputFormat;
     this.context = context;
-    this.graphState = graphState;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
     this.zooKeeperExt = zooKeeperExt;
@@ -83,11 +76,10 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   }
 
   @Override
-  public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
-    return new EdgeInputSplitsCallable<I, V, E, M>(
+  public InputSplitsCallable<I, V, E> newCallable(int threadId) {
+    return new EdgeInputSplitsCallable<I, V, E>(
         edgeInputFormat,
         context,
-        graphState,
         configuration,
         bspServiceWorker,
         splitsHandler,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index a8298c5..10b1a25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -21,7 +21,6 @@ package org.apache.giraph.worker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.metrics.GiraphMetrics;
@@ -57,24 +56,20 @@ import java.util.concurrent.Callable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public abstract class InputSplitsCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     implements Callable<VertexEdgeCount> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
   /** Class time object */
   private static final Time TIME = SystemTime.get();
   /** Configuration */
-  protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
-  configuration;
+  protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Context */
   protected final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph state */
-  private final GraphState<I, V, E, M> graphState;
   /** Handles IPC communication */
-  private final WorkerClientRequestProcessor<I, V, E, M>
+  protected final WorkerClientRequestProcessor<I, V, E>
   workerClientRequestProcessor;
   /**
    * Stores and processes the list of InputSplits advertised
@@ -93,7 +88,6 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Constructor.
    *
    * @param context Context
-   * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
@@ -101,20 +95,15 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    */
   public InputSplitsCallable(
       Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
     this.zooKeeperExt = zooKeeperExt;
     this.context = context;
     this.workerClientRequestProcessor =
-        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+        new NettyWorkerClientRequestProcessor<I, V, E>(
             context, configuration, bspServiceWorker);
-    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
-        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
-        context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
-        null);
     this.useLocality = configuration.useInputSplitLocality();
     this.splitsHandler = splitsHandler;
     this.configuration = configuration;
@@ -205,14 +194,11 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Load vertices/edges from the given input split.
    *
    * @param inputSplit Input split to load
-   * @param graphState Graph state
    * @return Count of vertices and edges loaded
    * @throws IOException
    * @throws InterruptedException
    */
-  protected abstract VertexEdgeCount readInputSplit(
-      InputSplit inputSplit,
-      GraphState<I, V, E, M> graphState)
+  protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
     throws IOException, InterruptedException;
 
   @Override
@@ -222,9 +208,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     int inputSplitsProcessed = 0;
     try {
       while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
-        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
-            loadInputSplit(inputSplitPath,
-                graphState));
+        vertexEdgeCount =
+            vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
         context.progress();
         ++inputSplitsProcessed;
       }
@@ -267,7 +252,6 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Mark the input split finished when done.
    *
    * @param inputSplitPath ZK location of input split
-   * @param graphState Current graph state
    * @return Mapping of vertex indices and statistics, or null if no data read
    * @throws IOException
    * @throws ClassNotFoundException
@@ -276,13 +260,11 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * @throws IllegalAccessException
    */
   private VertexEdgeCount loadInputSplit(
-      String inputSplitPath,
-      GraphState<I, V, E, M> graphState)
+      String inputSplitPath)
     throws IOException, ClassNotFoundException, InterruptedException,
       InstantiationException, IllegalAccessException {
     InputSplit inputSplit = getInputSplit(inputSplitPath);
-    VertexEdgeCount vertexEdgeCount =
-        readInputSplit(inputSplit, graphState);
+    VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
     if (LOG.isInfoEnabled()) {
       LOG.info("loadFromInputSplit: Finished loading " +
           inputSplitPath + " " + vertexEdgeCount);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 1c292ad..977e100 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
@@ -50,11 +49,10 @@ import java.io.IOException;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class VertexInputSplitsCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends InputSplitsCallable<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends InputSplitsCallable<I, V, E> {
   /** How often to update metrics and print info */
   public static final int VERTICES_UPDATE_PERIOD = 250000;
   /** How often to update filtered out metrics */
@@ -68,9 +66,9 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
   /** Bsp service worker (only use thread-safe methods) */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Filter to select which vertices to keep */
-  private final VertexInputFilter<I, V, E, M> vertexInputFilter;
+  private final VertexInputFilter<I, V, E> vertexInputFilter;
 
   // Metrics
   /** number of vertices loaded meter across all readers */
@@ -85,7 +83,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    *
    * @param vertexInputFormat Vertex input format
    * @param context Context
-   * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
@@ -94,13 +91,12 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   public VertexInputSplitsCallable(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt)  {
-    super(context, graphState, configuration, bspServiceWorker,
-        splitsHandler, zooKeeperExt);
+    super(context, configuration, bspServiceWorker, splitsHandler,
+        zooKeeperExt);
     this.vertexInputFormat = vertexInputFormat;
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -123,20 +119,16 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * maximum number of vertices to be read from an input split.
    *
    * @param inputSplit Input split to process with vertex reader
-   * @param graphState Current graph state
    * @return Vertices and edges loaded from this input split
    * @throws IOException
    * @throws InterruptedException
    */
   @Override
   protected VertexEdgeCount readInputSplit(
-      InputSplit inputSplit,
-      GraphState<I, V, E, M> graphState)
-    throws IOException, InterruptedException {
+      InputSplit inputSplit) throws IOException, InterruptedException {
     VertexReader<I, V, E> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, context);
-    vertexReader.setConf(
-        (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration);
+    vertexReader.setConf(configuration);
     vertexReader.initialize(inputSplit, context);
 
     long inputSplitVerticesLoaded = 0;
@@ -146,8 +138,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     long inputSplitEdgesLoaded = 0;
 
     while (vertexReader.nextVertex()) {
-      Vertex<I, V, E, M> readerVertex =
-          (Vertex<I, V, E, M>) vertexReader.getCurrentVertex();
+      Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
       if (readerVertex.getId() == null) {
         throw new IllegalArgumentException(
             "readInputSplit: Vertex reader returned a vertex " +
@@ -157,7 +148,6 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         readerVertex.setValue(configuration.createVertexValue());
       }
       readerVertex.setConf(configuration);
-      readerVertex.setGraphState(graphState);
 
       ++inputSplitVerticesLoaded;
 
@@ -172,7 +162,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
 
       PartitionOwner partitionOwner =
           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
-      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+      workerClientRequestProcessor.sendVertexRequest(
           partitionOwner, readerVertex);
       context.progress(); // do this before potential data transfer
       edgesSinceLastUpdate += readerVertex.getNumEdges();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index cf5e8ad..c9893d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.utils.CallableFactory;
@@ -34,21 +33,18 @@ import org.apache.hadoop.mapreduce.Mapper;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class VertexInputSplitsCallableFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     implements CallableFactory<VertexEdgeCount> {
   /** Vertex input format */
   private final VertexInputFormat<I, V, E> vertexInputFormat;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Graph state. */
-  private final GraphState<I, V, E, M> graphState;
   /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** {@link BspServiceWorker} we're running on. */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
   private final InputSplitsHandler splitsHandler;
   /** {@link ZooKeeperExt} for this worker. */
@@ -59,7 +55,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
    *
    * @param vertexInputFormat Vertex input format
    * @param context Mapper context
-   * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
@@ -68,14 +63,12 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   public VertexInputSplitsCallableFactory(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphState<I, V, E, M> graphState,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
       InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
     this.vertexInputFormat = vertexInputFormat;
     this.context = context;
-    this.graphState = graphState;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
     this.zooKeeperExt = zooKeeperExt;
@@ -83,11 +76,10 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   }
 
   @Override
-  public InputSplitsCallable<I, V, E, M> newCallable(int threadId) {
-    return new VertexInputSplitsCallable<I, V, E, M>(
+  public InputSplitsCallable<I, V, E> newCallable(int threadId) {
+    return new VertexInputSplitsCallable<I, V, E>(
         vertexInputFormat,
         context,
-        graphState,
         configuration,
         bspServiceWorker,
         splitsHandler,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 9a8a8b8..9bfd7b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -64,7 +64,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   private Map<String, Aggregator<Writable>> currentAggregatorMap =
       Maps.newHashMap();
   /** Service worker */
-  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+  private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Progressable for reporting progress */
   private final Progressable progressable;
   /** How big a single aggregator request can be */
@@ -80,7 +80,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    * @param progressable  Progressable for reporting progress
    */
   public WorkerAggregatorHandler(
-      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+      CentralizedServiceWorker<?, ?, ?> serviceWorker,
       ImmutableClassesGiraphConfiguration conf,
       Progressable progressable) {
     this.serviceWorker = serviceWorker;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index d3ffaea..729ba14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 public abstract class WorkerContext implements WorkerAggregatorUsage {
   /** Global graph state */
   private GraphState graphState;
+  /** Worker aggregator usage */
+  private WorkerAggregatorUsage workerAggregatorUsage;
 
   /**
    * Set the graph state.
@@ -41,6 +43,16 @@ public abstract class WorkerContext implements WorkerAggregatorUsage {
   }
 
   /**
+   * Set worker aggregator usage
+   *
+   * @param workerAggregatorUsage Worker aggregator usage
+   */
+  public void setWorkerAggregatorUsage(
+      WorkerAggregatorUsage workerAggregatorUsage) {
+    this.workerAggregatorUsage = workerAggregatorUsage;
+  }
+
+  /**
    * Initialize the WorkerContext.
    * This method is executed once on each Worker before the first
    * superstep starts.
@@ -112,11 +124,11 @@ public abstract class WorkerContext implements WorkerAggregatorUsage {
 
   @Override
   public <A extends Writable> void aggregate(String name, A value) {
-    graphState.getWorkerAggregatorUsage().aggregate(name, value);
+    workerAggregatorUsage.aggregate(name, value);
   }
 
   @Override
   public <A extends Writable> A getAggregatedValue(String name) {
-    return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
+    return workerAggregatorUsage.<A>getAggregatedValue(name);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index 91b842a..e771e36 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -25,8 +25,8 @@ import org.apache.giraph.comm.netty.handler.RequestServerHandler;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -46,17 +46,10 @@ public class ConnectionTest {
   /** Class configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
-  public static class IntVertex extends Vertex<IntWritable,
-            IntWritable, IntWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
   @Before
   public void setUp() {
     GiraphConfiguration tmpConfig = new GiraphConfiguration();
-    tmpConfig.setVertexClass(IntVertex.class);
+    tmpConfig.setComputationClass(IntNoOpComputation.class);
     conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
   }
 
@@ -71,7 +64,7 @@ public class ConnectionTest {
     Context context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
 
-    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+    ServerData<IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
     WorkerInfo workerInfo = new WorkerInfo();
     NettyServer server =
@@ -100,7 +93,7 @@ public class ConnectionTest {
     Context context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
 
-    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+    ServerData<IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
    RequestServerHandler.Factory requestServerHandlerFactory =
        new WorkerRequestServerHandler.Factory(serverData);
@@ -150,7 +143,7 @@ public class ConnectionTest {
     Context context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
 
-    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+    ServerData<IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
     WorkerInfo workerInfo = new WorkerInfo();
     NettyServer server = new NettyServer(conf,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 58aa7d1..5c69161 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -26,8 +26,8 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
@@ -51,7 +51,7 @@ public class RequestFailureTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
   /** Server data */
-  private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  private ServerData<IntWritable, IntWritable, IntWritable>
   serverData;
   /** Server */
   private NettyServer server;
@@ -60,21 +60,11 @@ public class RequestFailureTest {
   /** Mock context */
   private Context context;
 
-  /**
-   * Only for testing.
-   */
-  public static class TestVertex extends Vertex<IntWritable,
-        IntWritable, IntWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
   @Before
   public void setUp() throws IOException {
     // Setup the conf
     GiraphConfiguration tmpConf = new GiraphConfiguration();
-    tmpConf.setVertexClass(TestVertex.class);
+    tmpConf.setComputationClass(IntNoOpComputation.class);
     conf = new ImmutableClassesGiraphConfiguration(tmpConf);
 
     context = mock(Context.class);
@@ -91,7 +81,8 @@ public class RequestFailureTest {
     dataToSend.initialize();
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
+            IntWritable.class);
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -117,7 +108,8 @@ public class RequestFailureTest {
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index f1f8e26..7016572 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -35,6 +35,7 @@ import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
@@ -63,8 +64,7 @@ public class RequestTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
   /** Server data */
-  private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-  serverData;
+  private ServerData<IntWritable, IntWritable, IntWritable> serverData;
   /** Server */
   private NettyServer server;
   /** Client */
@@ -72,21 +72,11 @@ public class RequestTest {
   /** Worker info */
   private WorkerInfo workerInfo;
 
-  /**
-   * Only for testing.
-   */
-  public static class TestVertex extends Vertex<IntWritable,
-      IntWritable, IntWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
   @Before
   public void setUp() throws IOException {
     // Setup the conf
     GiraphConfiguration tmpConf = new GiraphConfiguration();
-    GiraphConstants.VERTEX_CLASS.set(tmpConf, TestVertex.class);
+    GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class);
     conf = new ImmutableClassesGiraphConfiguration(tmpConf);
 
     @SuppressWarnings("rawtypes")
@@ -110,7 +100,7 @@ public class RequestTest {
   public void sendVertexPartition() throws IOException {
     // Data to send
     int partitionId = 13;
-    Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition =
+    Partition<IntWritable, IntWritable, IntWritable> partition =
         conf.createPartition(partitionId, null);
     for (int i = 0; i < 10; ++i) {
       Vertex vertex = conf.createVertex();
@@ -119,10 +109,8 @@ public class RequestTest {
     }
 
     // Send the request
-    SendVertexRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendVertexRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partition);
+    SendVertexRequest<IntWritable, IntWritable, IntWritable> request =
+      new SendVertexRequest<IntWritable, IntWritable, IntWritable>(partition);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -131,15 +119,13 @@ public class RequestTest {
     server.stop();
 
     // Check the output
-    PartitionStore<IntWritable, IntWritable,
-        IntWritable, IntWritable> partitionStore =
+    PartitionStore<IntWritable, IntWritable, IntWritable> partitionStore =
         serverData.getPartitionStore();
     assertTrue(partitionStore.hasPartition(partitionId));
     int total = 0;
-    Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition2 =
+    Partition<IntWritable, IntWritable, IntWritable> partition2 =
         partitionStore.getPartition(partitionId);	
-    for (Vertex<IntWritable, IntWritable,
-        IntWritable, IntWritable> vertex : partition2) {
+    for (Vertex<IntWritable, IntWritable, IntWritable> vertex : partition2) {
       total += vertex.getId().get();
     }
     partitionStore.putPartition(partition2);
@@ -158,7 +144,8 @@ public class RequestTest {
     int partitionId = 0;
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
+            IntWritable.class);
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -187,7 +174,8 @@ public class RequestTest {
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();
@@ -203,12 +191,11 @@ public class RequestTest {
     // Data to send
     int partitionId = 19;
     Map<IntWritable, VertexMutations<IntWritable, IntWritable,
-    IntWritable, IntWritable>> vertexIdMutations =
+    IntWritable>> vertexIdMutations =
         Maps.newHashMap();
     for (int i = 0; i < 11; ++i) {
-      VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
-      mutations = new VertexMutations<IntWritable, IntWritable,
-          IntWritable, IntWritable>();
+      VertexMutations<IntWritable, IntWritable, IntWritable> mutations =
+          new VertexMutations<IntWritable, IntWritable, IntWritable>();
       for (int j = 0; j < 3; ++j) {
         Vertex vertex = conf.createVertex();
         vertex.initialize(new IntWritable(i), new IntWritable(j));
@@ -229,10 +216,9 @@ public class RequestTest {
     }
 
     // Send the request
-    SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendPartitionMutationsRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertexIdMutations);
+    SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable>
+        request = new SendPartitionMutationsRequest<IntWritable, IntWritable,
+        IntWritable>(partitionId, vertexIdMutations);
     GiraphMetrics.init(conf);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
@@ -243,16 +229,16 @@ public class RequestTest {
 
     // Check the output
     ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
-    IntWritable, IntWritable>> inVertexIdMutations =
+    IntWritable>> inVertexIdMutations =
         serverData.getVertexMutations();
     int keySum = 0;
     for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
-        IntWritable, IntWritable>> entry :
+        IntWritable>> entry :
           inVertexIdMutations.entrySet()) {
       synchronized (entry.getValue()) {
         keySum += entry.getKey().get();
         int vertexValueSum = 0;
-        for (Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
+        for (Vertex<IntWritable, IntWritable, IntWritable>
         vertex : entry.getValue().getAddedVertexList()) {
           vertexValueSum += vertex.getValue().get();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index c27156f..c026cf8 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -25,7 +25,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.IntWritable;
@@ -48,17 +48,10 @@ public class SaslConnectionTest {
   /** Class configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
-  public static class IntVertex extends Vertex<IntWritable,
-          IntWritable, IntWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
   @Before
   public void setUp() {
     GiraphConfiguration tmpConfig = new GiraphConfiguration();
-    tmpConfig.setVertexClass(IntVertex.class);
+    tmpConfig.setComputationClass(IntNoOpComputation.class);
     GiraphConstants.AUTHENTICATE.set(tmpConfig, true);
     conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
   }
@@ -74,7 +67,7 @@ public class SaslConnectionTest {
     Context context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
 
-    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+    ServerData<IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
 
     SaslServerHandler.Factory mockedSaslServerFactory =