You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:26:59 UTC

[03/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
deleted file mode 100644
index 7a63e8d..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import org.apache.giraph.aggregators.DoubleMaxAggregator;
-import org.apache.giraph.aggregators.DoubleMinAggregator;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates the basic Pregel PageRank implementation.
- */
-@Algorithm(
-    name = "Page rank"
-)
-public class SimplePageRankVertex extends Vertex<LongWritable,
-    DoubleWritable, FloatWritable, DoubleWritable> {
-  /** Number of supersteps for this test */
-  public static final int MAX_SUPERSTEPS = 30;
-  /** Logger */
-  private static final Logger LOG =
-      Logger.getLogger(SimplePageRankVertex.class);
-  /** Sum aggregator name */
-  private static String SUM_AGG = "sum";
-  /** Min aggregator name */
-  private static String MIN_AGG = "min";
-  /** Max aggregator name */
-  private static String MAX_AGG = "max";
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    if (getSuperstep() >= 1) {
-      double sum = 0;
-      for (DoubleWritable message : messages) {
-        sum += message.get();
-      }
-      DoubleWritable vertexValue =
-          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
-      setValue(vertexValue);
-      aggregate(MAX_AGG, vertexValue);
-      aggregate(MIN_AGG, vertexValue);
-      aggregate(SUM_AGG, new LongWritable(1));
-      LOG.info(getId() + ": PageRank=" + vertexValue +
-          " max=" + getAggregatedValue(MAX_AGG) +
-          " min=" + getAggregatedValue(MIN_AGG));
-    }
-
-    if (getSuperstep() < MAX_SUPERSTEPS) {
-      long edges = getNumEdges();
-      sendMessageToAllEdges(
-          new DoubleWritable(getValue().get() / edges));
-    } else {
-      voteToHalt();
-    }
-  }
-
-  /**
-   * Worker context used with {@link SimplePageRankVertex}.
-   */
-  public static class SimplePageRankVertexWorkerContext extends
-      WorkerContext {
-    /** Final max value for verification for local jobs */
-    private static double FINAL_MAX;
-    /** Final min value for verification for local jobs */
-    private static double FINAL_MIN;
-    /** Final sum value for verification for local jobs */
-    private static long FINAL_SUM;
-
-    public static double getFinalMax() {
-      return FINAL_MAX;
-    }
-
-    public static double getFinalMin() {
-      return FINAL_MIN;
-    }
-
-    public static long getFinalSum() {
-      return FINAL_SUM;
-    }
-
-    @Override
-    public void preApplication()
-      throws InstantiationException, IllegalAccessException {
-    }
-
-    @Override
-    public void postApplication() {
-      FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
-      FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
-      FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
-
-      LOG.info("aggregatedNumVertices=" + FINAL_SUM);
-      LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
-      LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
-    }
-
-    @Override
-    public void preSuperstep() {
-      if (getSuperstep() >= 3) {
-        LOG.info("aggregatedNumVertices=" +
-            getAggregatedValue(SUM_AGG) +
-            " NumVertices=" + getTotalNumVertices());
-        if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
-            getTotalNumVertices()) {
-          throw new RuntimeException("wrong value of SumAggreg: " +
-              getAggregatedValue(SUM_AGG) + ", should be: " +
-              getTotalNumVertices());
-        }
-        DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
-        LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
-        DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
-        LOG.info("aggregatedMinPageRank=" + minPagerank.get());
-      }
-    }
-
-    @Override
-    public void postSuperstep() { }
-  }
-
-  /**
-   * Master compute associated with {@link SimplePageRankVertex}.
-   * It registers required aggregators.
-   */
-  public static class SimplePageRankVertexMasterCompute extends
-      DefaultMasterCompute {
-    @Override
-    public void initialize() throws InstantiationException,
-        IllegalAccessException {
-      registerAggregator(SUM_AGG, LongSumAggregator.class);
-      registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
-      registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
-    }
-  }
-
-  /**
-   * Simple VertexReader that supports {@link SimplePageRankVertex}
-   */
-  public static class SimplePageRankVertexReader extends
-      GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
-    /** Class logger */
-    private static final Logger LOG =
-        Logger.getLogger(SimplePageRankVertexReader.class);
-
-    @Override
-    public boolean nextVertex() {
-      return totalRecords > recordsRead;
-    }
-
-    @Override
-    public Vertex<LongWritable, DoubleWritable,
-        FloatWritable, Writable> getCurrentVertex() throws IOException {
-      Vertex<LongWritable, DoubleWritable, FloatWritable, Writable>
-          vertex = getConf().createVertex();
-      LongWritable vertexId = new LongWritable(
-          (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
-      DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
-      long targetVertexId =
-          (vertexId.get() + 1) %
-          (inputSplit.getNumSplits() * totalRecords);
-      float edgeValue = vertexId.get() * 100f;
-      List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
-      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
-          new FloatWritable(edgeValue)));
-      vertex.initialize(vertexId, vertexValue, edges);
-      ++recordsRead;
-      if (LOG.isInfoEnabled()) {
-        LOG.info("next: Return vertexId=" + vertex.getId().get() +
-            ", vertexValue=" + vertex.getValue() +
-            ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
-      }
-      return vertex;
-    }
-  }
-
-  /**
-   * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
-   */
-  public static class SimplePageRankVertexInputFormat extends
-    GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
-    @Override
-    public VertexReader<LongWritable, DoubleWritable,
-    FloatWritable> createVertexReader(InputSplit split,
-      TaskAttemptContext context)
-      throws IOException {
-      return new SimplePageRankVertexReader();
-    }
-  }
-
-  /**
-   * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
-   */
-  public static class SimplePageRankVertexOutputFormat extends
-      TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
-    @Override
-    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      return new SimplePageRankVertexWriter();
-    }
-
-    /**
-     * Simple VertexWriter that supports {@link SimplePageRankVertex}
-     */
-    public class SimplePageRankVertexWriter extends TextVertexWriter {
-      @Override
-      public void writeVertex(
-          Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
-        throws IOException, InterruptedException {
-        getRecordWriter().write(
-            new Text(vertex.getId().toString()),
-            new Text(vertex.getValue().toString()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
new file mode 100644
index 0000000..bc39cad
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Demonstrates the basic Pregel shortest paths implementation.
+ */
+@Algorithm(
+    name = "Shortest paths",
+    description = "Finds all shortest paths from a selected vertex"
+)
+public class SimpleShortestPathsComputation extends BasicComputation<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** The shortest paths id */
+  public static final LongConfOption SOURCE_ID =
+      new LongConfOption("SimpleShortestPathsVertex.sourceId", 1);
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleShortestPathsComputation.class);
+
+  /**
+   * Is this vertex the source id?
+   *
+   * @param vertex Vertex
+   * @return True if the source id
+   */
+  private boolean isSource(Vertex<LongWritable, ?, ?> vertex) {
+    return vertex.getId().get() == SOURCE_ID.get(getConf());
+  }
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+    }
+    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+    for (DoubleWritable message : messages) {
+      minDist = Math.min(minDist, message.get());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Vertex " + vertex.getId() + " got minDist = " + minDist +
+          " vertex value = " + vertex.getValue());
+    }
+    if (minDist < vertex.getValue().get()) {
+      vertex.setValue(new DoubleWritable(minDist));
+      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+        double distance = minDist + edge.getValue().get();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Vertex " + vertex.getId() + " sent to " +
+              edge.getTargetVertexId() + " = " + distance);
+        }
+        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
+      }
+    }
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
deleted file mode 100644
index 13d1d7c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.conf.LongConfOption;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates the basic Pregel shortest paths implementation.
- */
-@Algorithm(
-    name = "Shortest paths",
-    description = "Finds all shortest paths from a selected vertex"
-)
-public class SimpleShortestPathsVertex extends
-    Vertex<LongWritable, DoubleWritable,
-    FloatWritable, DoubleWritable> {
-  /** The shortest paths id */
-  public static final LongConfOption SOURCE_ID =
-      new LongConfOption("SimpleShortestPathsVertex.sourceId", 1);
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SimpleShortestPathsVertex.class);
-
-  /**
-   * Is this vertex the source id?
-   *
-   * @return True if the source id
-   */
-  private boolean isSource() {
-    return getId().get() == SOURCE_ID.get(getConf());
-  }
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    if (getSuperstep() == 0) {
-      setValue(new DoubleWritable(Double.MAX_VALUE));
-    }
-    double minDist = isSource() ? 0d : Double.MAX_VALUE;
-    for (DoubleWritable message : messages) {
-      minDist = Math.min(minDist, message.get());
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Vertex " + getId() + " got minDist = " + minDist +
-          " vertex value = " + getValue());
-    }
-    if (minDist < getValue().get()) {
-      setValue(new DoubleWritable(minDist));
-      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
-        double distance = minDist + edge.getValue().get();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Vertex " + getId() + " sent to " +
-              edge.getTargetVertexId() + " = " + distance);
-        }
-        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
-      }
-    }
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
new file mode 100644
index 0000000..c3fd215
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Just a simple Vertex compute implementation that executes 3 supersteps, then
+ * finishes.
+ */
+public class SimpleSuperstepComputation extends BasicComputation<LongWritable,
+    IntWritable, FloatWritable, IntWritable> {
+  @Override
+  public void compute(
+      Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+      Iterable<IntWritable> messages) throws IOException {
+    // Some checks for additional testing
+    if (getTotalNumVertices() < 1) {
+      throw new IllegalStateException("compute: Illegal total vertices " +
+          getTotalNumVertices());
+    }
+    if (getTotalNumEdges() < 0) {
+      throw new IllegalStateException("compute: Illegal total edges " +
+          getTotalNumEdges());
+    }
+    if (vertex.isHalted()) {
+      throw new IllegalStateException("compute: Impossible to be halted - " +
+          vertex.isHalted());
+    }
+
+    if (getSuperstep() > 3) {
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Simple VertexReader that supports {@link SimpleSuperstepComputation}
+   */
+  public static class SimpleSuperstepVertexReader extends
+      GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimpleSuperstepVertexReader.class);
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return totalRecords > recordsRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, IntWritable, FloatWritable> getCurrentVertex()
+      throws IOException, InterruptedException {
+      Vertex<LongWritable, IntWritable, FloatWritable> vertex =
+          getConf().createVertex();
+      long tmpId = reverseIdOrder ?
+          ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+          recordsRead - 1 :
+            (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+      LongWritable vertexId = new LongWritable(tmpId);
+      IntWritable vertexValue =
+          new IntWritable((int) (vertexId.get() * 10));
+      List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+      long targetVertexId =
+          (vertexId.get() + 1) %
+          (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+          new FloatWritable(edgeValue)));
+      vertex.initialize(vertexId, vertexValue, edges);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next: Return vertexId=" + vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", targetVertexId=" + targetVertexId +
+            ", edgeValue=" + edgeValue);
+      }
+      return vertex;
+    }
+  }
+
+  /**
+   * Simple VertexInputFormat that supports {@link SimpleSuperstepComputation}
+   */
+  public static class SimpleSuperstepVertexInputFormat extends
+    GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
+    @Override
+    public VertexReader<LongWritable, IntWritable, FloatWritable>
+    createVertexReader(InputSplit split, TaskAttemptContext context)
+      throws IOException {
+      return new SimpleSuperstepVertexReader();
+    }
+  }
+
+
+  /**
+   * Simple VertexOutputFormat that supports {@link SimpleSuperstepComputation}
+   */
+  public static class SimpleSuperstepVertexOutputFormat extends
+      TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+    @Override
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return new SimpleSuperstepVertexWriter();
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link SimpleSuperstepComputation}
+     */
+    public class SimpleSuperstepVertexWriter extends TextVertexWriter {
+      @Override
+      public void writeVertex(Vertex<LongWritable, IntWritable,
+          FloatWritable> vertex) throws IOException, InterruptedException {
+        getRecordWriter().write(
+            new Text(vertex.getId().toString()),
+            new Text(vertex.getValue().toString()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
deleted file mode 100644
index 6f8b352..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-/**
- * Just a simple Vertex compute implementation that executes 3 supersteps, then
- * finishes.
- */
-public class SimpleSuperstepVertex extends Vertex<LongWritable, IntWritable,
-    FloatWritable, IntWritable> {
-  @Override
-  public void compute(Iterable<IntWritable> messages) {
-    // Some checks for additional testing
-    if (getTotalNumVertices() < 1) {
-      throw new IllegalStateException("compute: Illegal total vertices " +
-          getTotalNumVertices());
-    }
-    if (getTotalNumEdges() < 0) {
-      throw new IllegalStateException("compute: Illegal total edges " +
-          getTotalNumEdges());
-    }
-    if (isHalted()) {
-      throw new IllegalStateException("compute: Impossible to be halted - " +
-          isHalted());
-    }
-
-    if (getSuperstep() > 3) {
-      voteToHalt();
-    }
-  }
-
-  /**
-   * Simple VertexReader that supports {@link SimpleSuperstepVertex}
-   */
-  public static class SimpleSuperstepVertexReader extends
-      GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
-    /** Class logger */
-    private static final Logger LOG =
-        Logger.getLogger(SimpleSuperstepVertexReader.class);
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      return totalRecords > recordsRead;
-    }
-
-    @Override
-    public Vertex<LongWritable, IntWritable, FloatWritable,
-        Writable> getCurrentVertex()
-      throws IOException, InterruptedException {
-      Vertex<LongWritable, IntWritable, FloatWritable, Writable> vertex =
-          getConf().createVertex();
-      long tmpId = reverseIdOrder ?
-          ((inputSplit.getSplitIndex() + 1) * totalRecords) -
-          recordsRead - 1 :
-            (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
-      LongWritable vertexId = new LongWritable(tmpId);
-      IntWritable vertexValue =
-          new IntWritable((int) (vertexId.get() * 10));
-      List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
-      long targetVertexId =
-          (vertexId.get() + 1) %
-          (inputSplit.getNumSplits() * totalRecords);
-      float edgeValue = vertexId.get() * 100f;
-      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
-          new FloatWritable(edgeValue)));
-      vertex.initialize(vertexId, vertexValue, edges);
-      ++recordsRead;
-      if (LOG.isInfoEnabled()) {
-        LOG.info("next: Return vertexId=" + vertex.getId().get() +
-            ", vertexValue=" + vertex.getValue() +
-            ", targetVertexId=" + targetVertexId +
-            ", edgeValue=" + edgeValue);
-      }
-      return vertex;
-    }
-  }
-
-  /**
-   * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
-   */
-  public static class SimpleSuperstepVertexInputFormat extends
-    GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
-    @Override
-    public VertexReader<LongWritable, IntWritable, FloatWritable>
-    createVertexReader(InputSplit split, TaskAttemptContext context)
-      throws IOException {
-      return new SimpleSuperstepVertexReader();
-    }
-  }
-
-
-  /**
-   * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
-   */
-  public static class SimpleSuperstepVertexOutputFormat extends
-      TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
-    @Override
-    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      return new SimpleSuperstepVertexWriter();
-    }
-
-    /**
-     * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
-     */
-    public class SimpleSuperstepVertexWriter extends TextVertexWriter {
-      @Override
-      public void writeVertex(Vertex<LongWritable, IntWritable,
-          FloatWritable, ?> vertex) throws IOException, InterruptedException {
-        getRecordWriter().write(
-            new Text(vertex.getId().toString()),
-            new Text(vertex.getValue().toString()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
index 157e6ef..b1ee2a0 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
@@ -39,7 +39,7 @@ public class SimpleTextVertexOutputFormat extends
   private class SimpleTextVertexWriter extends TextVertexWriter {
     @Override
     public void writeVertex(
-      Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+      Vertex<LongWritable, IntWritable, FloatWritable> vertex)
       throws IOException, InterruptedException {
       getRecordWriter().write(
           new Text(vertex.getId().toString()),

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
new file mode 100644
index 0000000..8608d02
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.utils.ArrayListWritable;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Demonstrates triangle closing in simple,
+ * unweighted graphs for Giraph.
+ *
+ * Triangle Closing: Vertex A and B maintain out-edges to C and D
+ * The algorithm, when finished, populates all vertices' value with an
+ * array of Writables representing all the vertices that each
+ * should form an out-edge to (connect with, if this is a social
+ * graph.)
+ * In this example, vertices A and B would hold empty arrays
+ * since they are already connected with C and D. Results:
+ * If the graph is undirected, C would hold value, D and D would
+ * hold value C, since both are neighbors of A and B and yet both
+ * were not previously connected to each other.
+ *
+ * In a social graph, the result values for vertex X would represent people
+ * that are likely a part of a person X's social circle (they know one or more
+ * people X is connected to already) but X had not previously met them yet.
+ * Given this new information, X can decide to connect to vertices (peoople) in
+ * the result array or not.
+ *
+ * Results at each vertex are ordered in terms of the # of neighbors
+ * who are connected to each vertex listed in the final vertex value.
+ * The more of a vertex's neighbors who "know" someone, the stronger
+ * your social relationship is presumed to be to that vertex (assuming
+ * a social graph) and the more likely you should connect with them.
+ *
+ * In this implementation, Edge Values are not used, but could be
+ * adapted to represent additional qualities that could affect the
+ * ordering of the final result array.
+ */
+public class SimpleTriangleClosingComputation extends BasicComputation<
+  IntWritable, SimpleTriangleClosingComputation.IntArrayListWritable,
+  NullWritable, IntWritable> {
+  /** Vertices to close the triangle, ranked by frequency of in-msgs */
+  private Map<IntWritable, Integer> closeMap =
+    Maps.<IntWritable, Integer>newHashMap();
+
+  @Override
+  public void compute(
+      Vertex<IntWritable, IntArrayListWritable, NullWritable> vertex,
+      Iterable<IntWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      // send list of this vertex's neighbors to all neighbors
+      for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+        sendMessageToAllEdges(vertex, edge.getTargetVertexId());
+      }
+    } else {
+      for (IntWritable message : messages) {
+        final int current = (closeMap.get(message) == null) ?
+          0 : closeMap.get(message) + 1;
+        closeMap.put(message, current);
+      }
+      // make sure the result values are sorted and
+      // packaged in an IntArrayListWritable for output
+      Set<Pair> sortedResults = Sets.<Pair>newTreeSet();
+      for (Map.Entry<IntWritable, Integer> entry : closeMap.entrySet()) {
+        sortedResults.add(new Pair(entry.getKey(), entry.getValue()));
+      }
+      IntArrayListWritable
+        outputList = new IntArrayListWritable();
+      for (Pair pair : sortedResults) {
+        if (pair.value > 0) {
+          outputList.add(pair.key);
+        } else {
+          break;
+        }
+      }
+      vertex.setValue(outputList);
+    }
+    vertex.voteToHalt();
+  }
+
+  /** Quick, immutable K,V storage for sorting in tree set */
+  public static class Pair implements Comparable<Pair> {
+    /** key
+     * @param key the IntWritable key */
+    private final IntWritable key;
+    /** value
+     * @param value the Integer value */
+    private final Integer value;
+    /** Constructor
+     * @param k the key
+     * @param v the value
+     */
+    public Pair(IntWritable k, Integer v) {
+      key = k;
+      value = v;
+    }
+    /** key getter
+     * @return the key */
+    public IntWritable getKey() { return key; }
+    /** value getter
+     * @return the value */
+    public Integer getValue() { return value; }
+    /** Comparator to quickly sort by values
+     * @param other the Pair to compare with THIS
+     * @return the comparison value as an integer */
+    @Override
+    public int compareTo(Pair other) {
+      return other.value - this.value;
+    }
+  }
+
+  /** Utility class for delivering the array of vertices THIS vertex
+    * should connect with to close triangles with neighbors */
+  public static class IntArrayListWritable
+    extends ArrayListWritable<IntWritable> {
+    /** Default constructor for reflection */
+    public IntArrayListWritable() {
+      super();
+    }
+    /** Set storage type for this ArrayListWritable */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setClass() {
+      setClass(IntWritable.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
deleted file mode 100644
index f44cb18..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.ArrayListWritable;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Demonstrates triangle closing in simple,
- * unweighted graphs for Giraph.
- *
- * Triangle Closing: Vertex A and B maintain out-edges to C and D
- * The algorithm, when finished, populates all vertices' value with an
- * array of Writables representing all the vertices that each
- * should form an out-edge to (connect with, if this is a social
- * graph.)
- * In this example, vertices A and B would hold empty arrays
- * since they are already connected with C and D. Results:
- * If the graph is undirected, C would hold value, D and D would
- * hold value C, since both are neighbors of A and B and yet both
- * were not previously connected to each other.
- *
- * In a social graph, the result values for vertex X would represent people
- * that are likely a part of a person X's social circle (they know one or more
- * people X is connected to already) but X had not previously met them yet.
- * Given this new information, X can decide to connect to vertices (peoople) in
- * the result array or not.
- *
- * Results at each vertex are ordered in terms of the # of neighbors
- * who are connected to each vertex listed in the final vertex value.
- * The more of a vertex's neighbors who "know" someone, the stronger
- * your social relationship is presumed to be to that vertex (assuming
- * a social graph) and the more likely you should connect with them.
- *
- * In this implementation, Edge Values are not used, but could be
- * adapted to represent additional qualities that could affect the
- * ordering of the final result array.
- */
-public class SimpleTriangleClosingVertex extends Vertex<
-  IntWritable, SimpleTriangleClosingVertex.IntArrayListWritable,
-  NullWritable, IntWritable> {
-  /** Vertices to close the triangle, ranked by frequency of in-msgs */
-  private Map<IntWritable, Integer> closeMap =
-    Maps.<IntWritable, Integer>newHashMap();
-
-  @Override
-  public void compute(Iterable<IntWritable> messages) {
-    if (getSuperstep() == 0) {
-      // send list of this vertex's neighbors to all neighbors
-      for (Edge<IntWritable, NullWritable> edge : getEdges()) {
-        sendMessageToAllEdges(edge.getTargetVertexId());
-      }
-    } else {
-      for (IntWritable message : messages) {
-        final int current = (closeMap.get(message) == null) ?
-          0 : closeMap.get(message) + 1;
-        closeMap.put(message, current);
-      }
-      // make sure the result values are sorted and
-      // packaged in an IntArrayListWritable for output
-      Set<SimpleTriangleClosingVertex.Pair> sortedResults =
-        Sets.<SimpleTriangleClosingVertex.Pair>newTreeSet();
-      for (Map.Entry<IntWritable, Integer> entry : closeMap.entrySet()) {
-        sortedResults.add(new Pair(entry.getKey(), entry.getValue()));
-      }
-      SimpleTriangleClosingVertex.IntArrayListWritable
-        outputList = new SimpleTriangleClosingVertex.IntArrayListWritable();
-      for (SimpleTriangleClosingVertex.Pair pair : sortedResults) {
-        if (pair.value > 0) {
-          outputList.add(pair.key);
-        } else {
-          break;
-        }
-      }
-      setValue(outputList);
-    }
-    voteToHalt();
-  }
-
-  /** Quick, immutable K,V storage for sorting in tree set */
-  public static class Pair implements Comparable<Pair> {
-    /** key
-     * @param key the IntWritable key */
-    private final IntWritable key;
-    /** value
-     * @param value the Integer value */
-    private final Integer value;
-    /** Constructor
-     * @param k the key
-     * @param v the value
-     */
-    public Pair(IntWritable k, Integer v) {
-      key = k;
-      value = v;
-    }
-    /** key getter
-     * @return the key */
-    public IntWritable getKey() { return key; }
-    /** value getter
-     * @return the value */
-    public Integer getValue() { return value; }
-    /** Comparator to quickly sort by values
-     * @param other the Pair to compare with THIS
-     * @return the comparison value as an integer */
-    @Override
-    public int compareTo(Pair other) {
-      return other.value - this.value;
-    }
-  }
-
-  /** Utility class for delivering the array of vertices THIS vertex
-    * should connect with to close triangles with neighbors */
-  public static class IntArrayListWritable
-    extends ArrayListWritable<IntWritable> {
-    /** Default constructor for reflection */
-    public IntArrayListWritable() {
-      super();
-    }
-    /** Set storage type for this ArrayListWritable */
-    @Override
-    @SuppressWarnings("unchecked")
-    public void setClass() {
-      setClass(IntWritable.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
index 8a6f775..a5051b8 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.examples.SimpleSuperstepVertex.
-    SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.worker.WorkerContext;
@@ -63,21 +63,21 @@ public class SimpleVertexWithWorkerContext implements Tool {
   /**
    * Actual vetex implementation
    */
-  public static class SimpleVertex extends
-      Vertex<LongWritable, IntWritable, FloatWritable,
-          DoubleWritable> {
+  public static class SimpleComputation extends BasicComputation<LongWritable,
+      IntWritable, FloatWritable, DoubleWritable> {
     @Override
-    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    public void compute(
+        Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
 
       long superstep = getSuperstep();
 
       if (superstep < TESTLENGTH) {
-        EmitterWorkerContext emitter =
-            (EmitterWorkerContext) getWorkerContext();
-        emitter.emit("vertexId=" + getId() +
+        EmitterWorkerContext emitter = getWorkerContext();
+        emitter.emit("vertexId=" + vertex.getId() +
             " superstep=" + superstep + "\n");
       } else {
-        voteToHalt();
+        vertex.voteToHalt();
       }
     }
   }
@@ -169,7 +169,7 @@ public class SimpleVertexWithWorkerContext implements Tool {
           "run: Must have 2 arguments <output path> <# of workers>");
     }
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-    job.getConfiguration().setVertexClass(SimpleVertex.class);
+    job.getConfiguration().setComputationClass(SimpleComputation.class);
     job.getConfiguration().setVertexInputFormatClass(
         SimpleSuperstepVertexInputFormat.class);
     job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
new file mode 100644
index 0000000..ad72951
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Vertex to test the local variables in Computation, and pre/postSuperstep
+ * methods
+ */
+public class TestComputationStateComputation extends BasicComputation<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** How many compute threads to use in the test */
+  public static final int NUM_COMPUTE_THREADS = 10;
+  /** How many vertices to create for the test */
+  public static final int NUM_VERTICES = 100;
+  /** How many partitions to have */
+  public static final int NUM_PARTITIONS = 25;
+
+  /**
+   * The counter should hold the number of vertices in this partition,
+   * plus the current superstep
+   */
+  private long counter;
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    counter++;
+    if (getSuperstep() > 5) {
+      vertex.voteToHalt();
+    }
+  }
+
+  @Override
+  public void preSuperstep() {
+    counter =
+      ((TestComputationStateWorkerContext) getWorkerContext()).superstepCounter;
+  }
+
+  @Override
+  public void postSuperstep() {
+    ((TestComputationStateWorkerContext) getWorkerContext()).totalCounter
+        .addAndGet(counter);
+  }
+
+  /**
+   * WorkerContext for TestComputationState
+   */
+  public static class TestComputationStateWorkerContext extends
+      DefaultWorkerContext {
+    /** Current superstep */
+    private long superstepCounter;
+    /**
+     * This counter should hold the sum of Computation's counters
+     */
+    private AtomicLong totalCounter;
+
+    @Override
+    public void preSuperstep() {
+      superstepCounter = getSuperstep();
+      totalCounter = new AtomicLong(0);
+    }
+
+    @Override
+    public void postSuperstep() {
+      assertEquals(totalCounter.get(),
+          NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
+    }
+  }
+
+  /**
+   * Throws exception if values are not equal.
+   *
+   * @param expected Expected value
+   * @param actual   Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
index 6e3c589..bcb02cf 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.examples;
 
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.master.DefaultMasterCompute;
@@ -41,7 +42,7 @@ import java.io.IOException;
  */
 public class VerifyMessage {
   /**
-   * Message that will be sent in {@link VerifyMessageVertex}.
+   * Message that will be sent in {@link VerifyMessageComputation}.
    */
   public static class VerifiableMessage implements Writable {
     /** Superstep sent on */
@@ -93,9 +94,9 @@ public class VerifyMessage {
   /**
    * Send and verify messages.
    */
-  public static class VerifyMessageVertex extends
-      Vertex<LongWritable, IntWritable, FloatWritable,
-      VerifiableMessage> {
+  public static class VerifyMessageComputation extends
+      BasicComputation<LongWritable, IntWritable, FloatWritable,
+          VerifiableMessage> {
     /** Dynamically set number of SUPERSTEPS */
     public static final String SUPERSTEP_COUNT =
         "verifyMessageVertex.superstepCount";
@@ -104,14 +105,15 @@ public class VerifyMessage {
     /** Number of SUPERSTEPS to run (6 by default) */
     private static int SUPERSTEPS = 6;
     /** Class logger */
-    private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
+    private static Logger LOG =
+        Logger.getLogger(VerifyMessageComputation.class);
 
     public static long getFinalSum() {
       return FINAL_SUM;
     }
 
     /**
-     * Worker context used with {@link VerifyMessageVertex}.
+     * Worker context used with {@link VerifyMessageComputation}.
      */
     public static class VerifyMessageVertexWorkerContext extends
         WorkerContext {
@@ -138,28 +140,30 @@ public class VerifyMessage {
     }
 
     @Override
-    public void compute(Iterable<VerifiableMessage> messages) {
+    public void compute(
+        Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+        Iterable<VerifiableMessage> messages) throws IOException {
       String sumAggregatorName = LongSumAggregator.class.getName();
       if (getSuperstep() > SUPERSTEPS) {
-        voteToHalt();
+        vertex.voteToHalt();
         return;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
       }
-      aggregate(sumAggregatorName, new LongWritable(getId().get()));
+      aggregate(sumAggregatorName, new LongWritable(vertex.getId().get()));
       if (LOG.isDebugEnabled()) {
         LOG.debug("compute: sum = " +
             this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
-            " for vertex " + getId());
+            " for vertex " + vertex.getId());
       }
       float msgValue = 0.0f;
       for (VerifiableMessage message : messages) {
         msgValue += message.value;
         if (LOG.isDebugEnabled()) {
           LOG.debug("compute: got msg = " + message +
-              " for vertex id " + getId() +
-              ", vertex value " + getValue() +
+              " for vertex id " + vertex.getId() +
+              ", vertex value " + vertex.getValue() +
               " on superstep " + getSuperstep());
         }
         if (message.superstep != getSuperstep() - 1) {
@@ -168,44 +172,44 @@ public class VerifyMessage {
                   "the previous superstep, current superstep = " +
                   getSuperstep());
         }
-        if ((message.sourceVertexId != getId().get() - 1) &&
-            (getId().get() != 0)) {
+        if ((message.sourceVertexId != vertex.getId().get() - 1) &&
+            (vertex.getId().get() != 0)) {
           throw new IllegalStateException(
               "compute: Impossible that this message didn't come " +
                   "from the previous vertex and came from " +
                   message.sourceVertexId);
         }
       }
-      int vertexValue = getValue().get();
-      setValue(new IntWritable(vertexValue + (int) msgValue));
+      int vertexValue = vertex.getValue().get();
+      vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("compute: vertex " + getId() +
-            " has value " + getValue() +
+        LOG.debug("compute: vertex " + vertex.getId() +
+            " has value " + vertex.getValue() +
             " on superstep " + getSuperstep());
       }
-      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
         FloatWritable newEdgeValue = new FloatWritable(
             edge.getValue().get() + (float) vertexValue);
         Edge<LongWritable, FloatWritable> newEdge =
             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("compute: vertex " + getId() +
+          LOG.debug("compute: vertex " + vertex.getId() +
               " sending edgeValue " + edge.getValue() +
               " vertexValue " + vertexValue +
               " total " + newEdgeValue +
               " to vertex " + edge.getTargetVertexId() +
               " on superstep " + getSuperstep());
         }
-        addEdge(newEdge);
+        vertex.addEdge(newEdge);
         sendMessage(edge.getTargetVertexId(),
             new VerifiableMessage(
-                getSuperstep(), getId().get(), newEdgeValue.get()));
+                getSuperstep(), vertex.getId().get(), newEdgeValue.get()));
       }
     }
   }
 
   /**
-   * Master compute associated with {@link VerifyMessageVertex}.
+   * Master compute associated with {@link VerifyMessageComputation}.
    * It registers required aggregators.
    */
   public static class VerifyMessageMasterCompute extends

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
index d328153..3d06561 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
@@ -44,7 +44,7 @@ public class VertexWithDoubleValueDoubleEdgeTextOutputFormat extends
   public class VertexWithDoubleValueWriter extends TextVertexWriter {
     @Override
     public void writeVertex(
-      Vertex<LongWritable, DoubleWritable, DoubleWritable, ?> vertex)
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex)
       throws IOException, InterruptedException {
       StringBuilder output = new StringBuilder();
       output.append(vertex.getId().get());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
index 85f3556..8cc769f 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
@@ -47,7 +47,7 @@ public class VertexWithDoubleValueNullEdgeTextOutputFormat extends
   public class VertexWithDoubleValueWriter extends TextVertexWriter {
     @Override
     public void writeVertex(
-        Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+        Vertex<LongWritable, DoubleWritable, NullWritable> vertex)
       throws IOException, InterruptedException {
       StringBuilder output = new StringBuilder();
       output.append(vertex.getId().get());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index 652913b..99ba770 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -20,9 +20,9 @@ package org.apache.giraph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
@@ -58,15 +58,15 @@ public class TestAutoCheckpoint extends BspCase {
     }
     Path outputPath = getTempPath(getCallingMethodName());
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
+    conf.setBoolean(SimpleCheckpoint.ENABLE_FAULT, true);
     conf.setInt("mapred.map.max.attempts", 4);
     // Trigger failure faster
     conf.setInt("mapred.task.timeout", 10000);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 38ba27f..28edbba 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -24,21 +24,22 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.examples.SimpleCombinerVertex;
-import org.apache.giraph.examples.SimpleFailVertex;
-import org.apache.giraph.examples.SimpleMasterComputeVertex;
-import org.apache.giraph.examples.SimpleMsgVertex;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
-import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCombinerComputation;
+import org.apache.giraph.examples.SimpleFailComputation;
+import org.apache.giraph.examples.SimpleMasterComputeComputation;
+import org.apache.giraph.examples.SimpleMsgComputation;
+import org.apache.giraph.examples.SimplePageRankComputation;
+import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimpleShortestPathsComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.worker.InputSplitPathOrganizer;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;
@@ -111,13 +112,12 @@ public class
     System.out.println("testInstantiateVertex: java.class.path=" +
         System.getProperty("java.class.path"));
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleSuperstepVertex.class);
-    conf.setVertexInputFormatClass(
-        SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
+    conf.setComputationClass(SimpleSuperstepComputation.class);
+    conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     ImmutableClassesGiraphConfiguration configuration =
         new ImmutableClassesGiraphConfiguration(job.getConfiguration());
-    Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
+    Vertex<LongWritable, IntWritable, FloatWritable> vertex =
         configuration.createVertex();
     vertex.initialize(new LongWritable(1), new IntWritable(1));
     System.out.println("testInstantiateVertex: Got vertex " + vertex);
@@ -133,11 +133,8 @@ public class
         byteArrayOutputStream.toString());
   }
 
-  private static class NullVertex extends Vertex<NullWritable, NullWritable,
-      NullWritable, NullWritable> {
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException { }
-  }
+  private static class NullComputation extends NoOpComputation<NullWritable,
+      NullWritable, NullWritable, NullWritable> { }
 
   /**
    * Test whether vertices with NullWritable for vertex value type, edge value
@@ -146,19 +143,17 @@ public class
   @Test
   public void testInstantiateNullVertex() throws IOException {
     GiraphConfiguration nullConf = new GiraphConfiguration();
-    nullConf.setVertexClass(NullVertex.class);
-    ImmutableClassesGiraphConfiguration<
-        NullWritable, NullWritable, NullWritable,
+    nullConf.setComputationClass(NullComputation.class);
+    ImmutableClassesGiraphConfiguration<NullWritable, NullWritable,
         NullWritable> immutableClassesGiraphConfiguration =
         new ImmutableClassesGiraphConfiguration<
-            NullWritable, NullWritable, NullWritable, NullWritable>(
-            nullConf);
+            NullWritable, NullWritable, NullWritable>(nullConf);
     NullWritable vertexValue =
         immutableClassesGiraphConfiguration.createVertexValue();
     NullWritable edgeValue =
         immutableClassesGiraphConfiguration.createEdgeValue();
     NullWritable messageValue =
-        immutableClassesGiraphConfiguration.createMessageValue();
+        immutableClassesGiraphConfiguration.createOutgoingMessageValue();
     assertSame(vertexValue.getClass(), NullWritable.class);
     assertSame(vertexValue, edgeValue);
     assertSame(edgeValue, messageValue);
@@ -180,7 +175,7 @@ public class
       return;
     }
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleSuperstepVertex.class);
+    conf.setComputationClass(SimpleSuperstepComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     conf = job.getConfiguration();
@@ -222,7 +217,7 @@ public class
     }
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleFailVertex.class);
+    conf.setComputationClass(SimpleFailComputation.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf,
         getTempPath(getCallingMethodName()));
@@ -243,7 +238,7 @@ public class
     String callingMethod = getCallingMethodName();
     Path outputPath = getTempPath(callingMethod);
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleSuperstepVertex.class);
+    conf.setComputationClass(SimpleSuperstepComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     GiraphJob job = prepareJob(callingMethod, conf, outputPath);
@@ -268,7 +263,7 @@ public class
   public void testBspMsg()
       throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleMsgVertex.class);
+    conf.setComputationClass(SimpleMsgComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     assertTrue(job.run(true));
@@ -287,7 +282,7 @@ public class
   public void testEmptyVertexInputFormat()
       throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleMsgVertex.class);
+    conf.setComputationClass(SimpleMsgComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
@@ -305,7 +300,7 @@ public class
   public void testBspCombiner()
       throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleCombinerVertex.class);
+    conf.setComputationClass(SimpleCombinerComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setCombinerClass(SimpleSumCombiner.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
@@ -366,11 +361,11 @@ public class
       throws IOException, InterruptedException, ClassNotFoundException {
     Path outputPath = getTempPath(getCallingMethodName());
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleShortestPathsVertex.class);
+    conf.setComputationClass(SimpleShortestPathsComputation.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     conf.setVertexOutputFormatClass(
         JsonLongDoubleFloatDoubleVertexOutputFormat.class);
-    SimpleShortestPathsVertex.SOURCE_ID.set(conf, 0);
+    SimpleShortestPathsComputation.SOURCE_ID.set(conf, 0);
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
 
     assertTrue(job.run(true));
@@ -394,15 +389,15 @@ public class
     Path outputPath = getTempPath(getCallingMethodName());
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimplePageRankVertex.class);
+    conf.setComputationClass(SimplePageRankComputation.class);
     conf.setAggregatorWriterClass(TextAggregatorWriter.class);
     conf.setMasterComputeClass(
-        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+        SimplePageRankComputation.SimplePageRankMasterCompute.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     conf.setVertexOutputFormatClass(
-        SimplePageRankVertex.SimplePageRankVertexOutputFormat.class);
+        SimplePageRankComputation.SimplePageRankVertexOutputFormat.class);
     conf.setWorkerContextClass(
-        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+        SimplePageRankComputation.SimplePageRankWorkerContext.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
     GiraphConfiguration configuration = job.getConfiguration();
     Path aggregatorValues = getTempPath("aggregatorValues");
@@ -419,11 +414,11 @@ public class
     try {
       if (!runningInDistributedMode()) {
         double maxPageRank =
-            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+            SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax();
         double minPageRank =
-            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+            SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin();
         long numVertices =
-            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+            SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum();
         System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
             " minPageRank=" + minPageRank + " numVertices=" + numVertices);
 
@@ -455,7 +450,7 @@ public class
             }
           }
 
-          int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
+          int maxSuperstep = SimplePageRankComputation.MAX_SUPERSTEPS;
           assertEquals(maxSuperstep + 2, minValues.size());
           assertEquals(maxSuperstep + 2, maxValues.size());
           assertEquals(maxSuperstep + 2, vertexCounts.size());
@@ -485,17 +480,17 @@ public class
   public void testBspMasterCompute()
       throws IOException, InterruptedException, ClassNotFoundException {
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(SimpleMasterComputeVertex.class);
+    conf.setComputationClass(SimpleMasterComputeComputation.class);
     conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     conf.setMasterComputeClass(
-        SimpleMasterComputeVertex.SimpleMasterCompute.class);
+        SimpleMasterComputeComputation.SimpleMasterCompute.class);
     conf.setWorkerContextClass(
-        SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
+        SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     assertTrue(job.run(true));
     if (!runningInDistributedMode()) {
       double finalSum =
-          SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.getFinalSum();
+          SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.getFinalSum();
       System.out.println("testBspMasterCompute: finalSum=" + finalSum);
       assertEquals(32.5, finalSum, 0d);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
new file mode 100644
index 0000000..1092eac
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankComputation;
+import org.apache.giraph.examples.TestComputationStateComputation;
+import org.apache.giraph.job.GiraphJob;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestComputationState extends BspCase {
+  public TestComputationState() {
+    super(TestComputationState.class.getName());
+  }
+
+  @Test
+  public void testComputationState() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    if (runningInDistributedMode()) {
+      System.out.println(
+          "testComputeContext: Ignore this test in distributed mode.");
+      return;
+    }
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(TestComputationStateComputation.class);
+    conf.setVertexInputFormatClass(
+        SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+    conf.setWorkerContextClass(
+        TestComputationStateComputation.TestComputationStateWorkerContext.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), conf);
+    // Use multithreading
+    job.getConfiguration().setNumComputeThreads(
+        TestComputationStateComputation.NUM_COMPUTE_THREADS);
+    // Increase the number of vertices
+    job.getConfiguration().setInt(
+        GeneratedVertexReader.READER_VERTICES,
+        TestComputationStateComputation.NUM_VERTICES);
+    // Increase the number of partitions
+    GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),
+        TestComputationStateComputation.NUM_PARTITIONS);
+    assertTrue(job.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 12f0d8d..4537eac 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -21,9 +21,9 @@ package org.apache.giraph;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.partition.HashRangePartitionerFactory;
@@ -76,12 +76,12 @@ public class TestGraphPartitioner extends BspCase {
       throws IOException, InterruptedException, ClassNotFoundException {
     Path outputPath = getTempPath("testVertexBalancer");
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     GiraphJob job = prepareJob("testVertexBalancer", conf, outputPath);
@@ -94,12 +94,12 @@ public class TestGraphPartitioner extends BspCase {
     FileSystem hdfs = FileSystem.get(job.getConfiguration());
 
     conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     outputPath = getTempPath("testHashPartitioner");
@@ -109,12 +109,12 @@ public class TestGraphPartitioner extends BspCase {
 
     outputPath = getTempPath("testSuperstepHashPartitioner");
     conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     job = prepareJob("testSuperstepHashPartitioner", conf, outputPath);
@@ -127,12 +127,12 @@ public class TestGraphPartitioner extends BspCase {
 
     job = new GiraphJob("testHashRangePartitioner");
     setupConfiguration(job);
-    job.getConfiguration().setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    job.getConfiguration().setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     job.getConfiguration().setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     job.getConfiguration().setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     job.getConfiguration().setVertexInputFormatClass(
         SimpleSuperstepVertexInputFormat.class);
     job.getConfiguration().setVertexOutputFormatClass(
@@ -146,12 +146,12 @@ public class TestGraphPartitioner extends BspCase {
 
     outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
     conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     job = prepareJob("testReverseIdSuperstepHashPartitioner", conf,
@@ -165,12 +165,12 @@ public class TestGraphPartitioner extends BspCase {
 
     job = new GiraphJob("testSimpleRangePartitioner");
     setupConfiguration(job);
-    job.getConfiguration().setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    job.getConfiguration().setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     job.getConfiguration().setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     job.getConfiguration().setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     job.getConfiguration().setVertexInputFormatClass(
         SimpleSuperstepVertexInputFormat.class);
     job.getConfiguration().setVertexOutputFormatClass(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index 766e1af..fc94480 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -20,9 +20,9 @@ package org.apache.giraph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -54,12 +54,12 @@ public class TestManualCheckpoint extends BspCase {
     Path checkpointsDir = getTempPath("checkPointsForTesting");
     Path outputPath = getTempPath(getCallingMethodName());
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
@@ -75,7 +75,7 @@ public class TestManualCheckpoint extends BspCase {
     if (!runningInDistributedMode()) {
       FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
           outputPath);
-      idSum = SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+      idSum = SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
           .getFinalSum();
       System.out.println("testBspCheckpoint: idSum = " + idSum +
           " fileLen = " + fileStatus.getLen());
@@ -86,25 +86,25 @@ public class TestManualCheckpoint extends BspCase {
         " with checkpoint path = " + checkpointsDir);
     outputPath = getTempPath(getCallingMethodName() + "Restarted");
     conf = new GiraphConfiguration();
-    conf.setVertexClass(
-        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    conf.setComputationClass(
+        SimpleCheckpoint.SimpleCheckpointComputation.class);
     conf.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
     conf.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
     GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
         conf, outputPath);
     configuration.setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+        SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
     GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
         checkpointsDir.toString());
 
     assertTrue(restartedJob.run(true));
     if (!runningInDistributedMode()) {
       long idSumRestarted =
-          SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+          SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
               .getFinalSum();
       System.out.println("testBspCheckpoint: idSumRestarted = " +
           idSumRestarted);