You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:02 UTC

[06/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index 53b8a24..4e8041a 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -37,8 +37,8 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.CollectionUtils;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -68,35 +68,28 @@ import static org.junit.Assert.assertTrue;
 /** Test for different types of message stores */
 public class TestMessageStores {
   private static File directory;
-  private static ImmutableClassesGiraphConfiguration config;
+  private static ImmutableClassesGiraphConfiguration<IntWritable,
+      IntWritable, IntWritable> config;
   private static TestData testData;
   private static
-  CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable>
-      service;
+  CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service;
   /**
    * Pseudo-random number generator with the same seed to help with
    * debugging)
    */
   private static final Random RANDOM = new Random(101);
 
-  private static class IntVertex extends Vertex<IntWritable,
-        IntWritable, IntWritable, IntWritable> {
-
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
   @Before
   public void prepare() throws IOException {
     directory = Files.createTempDir();
 
     Configuration.addDefaultResource("giraph-site.xml");
     GiraphConfiguration initConfig = new GiraphConfiguration();
-    initConfig.setVertexClass(IntVertex.class);
+    initConfig.setComputationClass(IntNoOpComputation.class);
     GiraphConstants.MESSAGES_DIRECTORY.set(
         initConfig, new File(directory, "giraph_messages").toString());
-    config = new ImmutableClassesGiraphConfiguration(initConfig);
+    config = new ImmutableClassesGiraphConfiguration<IntWritable,
+        IntWritable, IntWritable>(initConfig);
 
     testData = new TestData();
     testData.maxId = 1000000;
@@ -158,18 +151,18 @@ public class TestMessageStores {
      * @param config  Hadoop configuration
      */
     InputMessageStore(
-        CentralizedServiceWorker<IntWritable, ?, ?, IntWritable> service,
-        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?,
-            IntWritable> config,
+        CentralizedServiceWorker<IntWritable, ?, ?> service,
+        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
         Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
-      super(service, config);
+      super(IntWritable.class, service, config);
       // Adds all the messages to the store
       for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
           inputMap.entrySet()) {
         int partitionId = getPartitionId(entry.getKey());
         ByteArrayVertexIdMessages<IntWritable, IntWritable>
             byteArrayVertexIdMessages =
-            new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+            new ByteArrayVertexIdMessages<IntWritable,
+                IntWritable>(IntWritable.class);
         byteArrayVertexIdMessages.setConf(config);
         byteArrayVertexIdMessages.initialize();
         for (IntWritable message : entry.getValue()) {
@@ -238,7 +231,7 @@ public class TestMessageStores {
     messageStore.write(out);
     out.close();
 
-    messageStore = messageStoreFactory.newStore();
+    messageStore = messageStoreFactory.newStore(IntWritable.class);
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(
         (new FileInputStream(file))));
@@ -255,7 +248,7 @@ public class TestMessageStores {
       TestData testData) throws IOException {
     SortedMap<IntWritable, Collection<IntWritable>> messages =
         new TreeMap<IntWritable, Collection<IntWritable>>();
-    S messageStore = messageStoreFactory.newStore();
+    S messageStore = messageStoreFactory.newStore(IntWritable.class);
     putNTimes(messageStore, messages, testData);
     assertTrue(equalMessages(messageStore, messages));
     messageStore.clearAll();
@@ -268,7 +261,8 @@ public class TestMessageStores {
   public void testByteArrayMessagesPerVertexStore() {
     try {
       testMessageStore(
-          ByteArrayMessagesPerVertexStore.newFactory(service, config),
+          ByteArrayMessagesPerVertexStore.<IntWritable, IntWritable>newFactory(
+              service, config),
           testData);
     } catch (IOException e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
index 9075145..4e38bff 100644
--- a/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestObjectCreation.java
@@ -18,10 +18,10 @@
 
 package org.apache.giraph.conf;
 
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
+import org.apache.giraph.utils.LongNoOpComputation;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -51,7 +51,7 @@ public class TestObjectCreation {
   private long total = 0;
   private long expected = COUNT * (COUNT - 1) / 2L;
   private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
-      LongWritable, LongWritable> configuration;
+      LongWritable> configuration;
 
   @Before
   public void setUp() {
@@ -59,11 +59,12 @@ public class TestObjectCreation {
     GiraphConstants.VERTEX_ID_CLASS.set(conf, IntWritable.class);
     GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
     GiraphConstants.EDGE_VALUE_CLASS.set(conf, DoubleWritable.class);
-    GiraphConstants.MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
-    conf.setVertexClass(ImmutableVertex.class);
+    GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, LongWritable.class);
+    conf.setComputationClass(LongNoOpComputation.class);
     configuration =
         new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
-            LongWritable, LongWritable>(conf);
+            LongWritable>(conf);
     total = 0;
     System.gc();
   }
@@ -161,15 +162,8 @@ public class TestObjectCreation {
     }
   }
 
-  private static class ImmutableVertex extends Vertex<LongWritable,
-        LongWritable, LongWritable, LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException {
-    }
-  }
-
   private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
-      LongWritable, LongWritable> getConfiguration() {
+      LongWritable> getConfiguration() {
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index 15d2bb0..b6e17fd 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.graph;
 
 import com.google.common.collect.Lists;
+
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ArrayListEdges;
@@ -36,6 +37,7 @@ import org.apache.giraph.time.Times;
 import org.apache.giraph.utils.DynamicChannelBufferInputStream;
 import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
 import org.apache.giraph.utils.EdgeIterables;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.giraph.utils.WritableUtils;
@@ -70,11 +72,8 @@ public class TestVertexAndEdges {
   /**
    * Dummy concrete vertex.
    */
-  public static class TestVertex extends Vertex<LongWritable, FloatWritable,
-        DoubleWritable, LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) { }
-  }
+  public static class TestComputation extends NoOpComputation<LongWritable,
+      FloatWritable, DoubleWritable, LongWritable> { }
 
   /**
    * A basic {@link org.apache.giraph.edge.OutEdges} implementation that doesn't provide any
@@ -160,10 +159,10 @@ public class TestVertexAndEdges {
     edgesClasses.add(LongDoubleHashMapEdges.class);
   }
 
-  private Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
+  private Vertex<LongWritable, FloatWritable, DoubleWritable>
   instantiateVertex(Class<? extends OutEdges> edgesClass) {
     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
-    giraphConfiguration.setVertexClass(TestVertex.class);
+    giraphConfiguration.setComputationClass(TestComputation.class);
     giraphConfiguration.setOutEdgesClass(edgesClass);
     ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
@@ -175,7 +174,7 @@ public class TestVertexAndEdges {
    */
   @Test
   public void testVertexIdAndValue() {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         instantiateVertex(ArrayListEdges.class);
     assertNotNull(vertex);
     vertex.initialize(new LongWritable(7), new FloatWritable(3.0f));
@@ -189,7 +188,7 @@ public class TestVertexAndEdges {
   instantiateOutEdges(Class<? extends OutEdges> edgesClass) {
     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
     // Needed to extract type arguments in ReflectionUtils.
-    giraphConfiguration.setVertexClass(TestVertex.class);
+    giraphConfiguration.setComputationClass(TestComputation.class);
     giraphConfiguration.setOutEdgesClass(edgesClass);
     ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
@@ -208,7 +207,7 @@ public class TestVertexAndEdges {
   }
 
   private void testEdgesClass(Class<? extends OutEdges> edgesClass) {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         instantiateVertex(edgesClass);
     OutEdges<LongWritable, DoubleWritable> outEdges =
         instantiateOutEdges(edgesClass);
@@ -252,7 +251,7 @@ public class TestVertexAndEdges {
   }
 
   private void testMutateEdgesClass(Class<? extends OutEdges> edgesClass) {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         instantiateVertex(edgesClass);
     OutEdges<LongWritable, DoubleWritable> outEdges =
         instantiateOutEdges(edgesClass);
@@ -339,9 +338,9 @@ public class TestVertexAndEdges {
     }
   }
 
-  private Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
+  private Vertex<LongWritable, FloatWritable, DoubleWritable>
   buildVertex(Class<? extends OutEdges> edgesClass) {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         instantiateVertex(edgesClass);
     OutEdges<LongWritable, DoubleWritable> outEdges =
         instantiateOutEdges(edgesClass);
@@ -362,7 +361,7 @@ public class TestVertexAndEdges {
 
   private void testSerializeOutEdgesClass(
       Class<? extends OutEdges> edgesClass) {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         buildVertex(edgesClass);
 
     long serializeNanosStart;
@@ -381,7 +380,7 @@ public class TestVertexAndEdges {
         (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
         " bytes / sec for " + edgesClass.getName());
 
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
+    Vertex<LongWritable, FloatWritable, DoubleWritable>
         readVertex = buildVertex(edgesClass);
     
     long deserializeNanosStart;
@@ -407,7 +406,7 @@ public class TestVertexAndEdges {
   private void testDynamicChannelBufferSerializeOutEdgesClass(
       Class<? extends OutEdges> edgesClass)
       throws IOException {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         buildVertex(edgesClass);
 
     long serializeNanosStart;
@@ -430,7 +429,7 @@ public class TestVertexAndEdges {
             Time.NS_PER_SECOND / serializeNanos) +
         " bytes / sec for " + edgesClass.getName());
 
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
+    Vertex<LongWritable, FloatWritable, DoubleWritable>
         readVertex = buildVertex(edgesClass);
 
     long deserializeNanosStart;
@@ -462,7 +461,7 @@ public class TestVertexAndEdges {
   private void testUnsafeSerializeOutEdgesClass(
       Class<? extends OutEdges> edgesClass)
       throws IOException {
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+    Vertex<LongWritable, FloatWritable, DoubleWritable> vertex =
         buildVertex(edgesClass);
 
     long serializeNanosStart;
@@ -487,7 +486,7 @@ public class TestVertexAndEdges {
             Time.NS_PER_SECOND / serializeNanos) +
         " bytes / sec for " + edgesClass.getName());
 
-    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
+    Vertex<LongWritable, FloatWritable, DoubleWritable>
         readVertex = buildVertex(edgesClass);
 
     long deserializeNanosStart;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
index 6849e3a..fdfb5e9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
@@ -22,15 +22,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.AdjacencyListTextVertexOutputFormat;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Before;
@@ -41,25 +42,20 @@ import static org.mockito.Mockito.*;
 public class TestAdjacencyListTextVertexOutputFormat extends AdjacencyListTextVertexOutputFormat<Text, DoubleWritable, DoubleWritable> {
   /** Test configuration */
   private ImmutableClassesGiraphConfiguration<Text,
-      DoubleWritable, DoubleWritable, Writable> conf;
+      DoubleWritable, DoubleWritable> conf;
 
   /**
    * Dummy class to allow ImmutableClassesGiraphConfiguration to be created.
    */
-  public static class DummyVertex extends Vertex<Text, DoubleWritable,
-      DoubleWritable, DoubleWritable> {
-    @Override
-    public void compute(Iterable<DoubleWritable> messages) throws IOException {
-      // Do nothing
-    }
-  }
+  public static class DummyComputation extends NoOpComputation<Text,
+      DoubleWritable, DoubleWritable, DoubleWritable> { }
 
   @Before
   public void setUp() {
     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
-    giraphConfiguration.setVertexClass(DummyVertex.class);
+    giraphConfiguration.setComputationClass(DummyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<Text,
-        DoubleWritable, DoubleWritable, Writable>(giraphConfiguration);
+        DoubleWritable, DoubleWritable>(giraphConfiguration);
   }
 
   protected AdjacencyListTextVertexWriter createVertexWriter(
@@ -104,7 +100,6 @@ public class TestAdjacencyListTextVertexOutputFormat extends AdjacencyListTextVe
     Vertex vertex = mock(Vertex.class);
     when(vertex.getId()).thenReturn(new Text("San Francisco"));
     when(vertex.getValue()).thenReturn(new DoubleWritable(0d));
-    when(vertex.getTotalNumEdges()).thenReturn(2l);
     List<Edge<Text, DoubleWritable>> cities = Lists.newArrayList();
     Collections.addAll(cities,
         EdgeFactory.create(new Text("Los Angeles"), new DoubleWritable(347.16)),
@@ -133,7 +128,6 @@ public class TestAdjacencyListTextVertexOutputFormat extends AdjacencyListTextVe
     Vertex vertex = mock(Vertex.class);
     when(vertex.getId()).thenReturn(new Text("San Francisco"));
     when(vertex.getValue()).thenReturn(new DoubleWritable(0d));
-    when(vertex.getTotalNumEdges()).thenReturn(2l);
     List<Edge<Text, DoubleWritable>> cities = Lists.newArrayList();
     Collections.addAll(cities,
         EdgeFactory.create(new Text("Los Angeles"), new DoubleWritable(347.16)),

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
index cb1a8da..327aaa3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
@@ -23,14 +23,15 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
 import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.ComputationCountEdges;
+import org.apache.giraph.utils.IntIntNullNoOpComputation;
 import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
-import org.apache.giraph.vertices.VertexCountEdges;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
@@ -65,7 +66,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(VertexCountEdges.class);
+    conf.setComputationClass(ComputationCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -94,7 +95,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(VertexCountEdges.class);
+    conf.setComputationClass(ComputationCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -130,7 +131,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(IntIntNullVertexDoNothing.class);
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -161,7 +162,7 @@ public class TestEdgeInput extends BspCase {
     assertEquals(3, (int) values.get(5));
 
     conf = new GiraphConfiguration();
-    conf.setVertexClass(VertexCountEdges.class);
+    conf.setComputationClass(ComputationCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -191,7 +192,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(TestVertexCheckEdgesType.class);
+    conf.setComputationClass(TestComputationCheckEdgesType.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -210,12 +211,14 @@ public class TestEdgeInput extends BspCase {
     assertEquals(0, (int) values.get(4));
   }
 
-  public static class TestVertexCheckEdgesType extends VertexCountEdges {
+  public static class TestComputationCheckEdgesType extends
+      ComputationCountEdges {
     @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      assertFalse(getEdges() instanceof TestOutEdgesFilterEven);
-      assertTrue(getEdges() instanceof ByteArrayEdges);
-      super.compute(messages);
+    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<NullWritable> messages) throws IOException {
+      assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
+      assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
+      super.compute(vertex, messages);
     }
   }
 
@@ -223,7 +226,7 @@ public class TestEdgeInput extends BspCase {
       implements VertexValueFactory<IntWritable> {
     @Override
     public void initialize(ImmutableClassesGiraphConfiguration<?, IntWritable,
-            ?, ?> configuration) { }
+        ?> configuration) { }
 
     @Override
     public IntWritable createVertexValue() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
index 83a366d..86c37ea 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
@@ -26,9 +26,9 @@ import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.ComputationCountEdges;
+import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
-import org.apache.giraph.vertices.VertexCountEdges;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
@@ -60,7 +60,7 @@ public class TestFilters extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(VertexCountEdges.class);
+    conf.setComputationClass(ComputationCountEdges.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
     conf.setEdgeInputFilterClass(EdgeFilter.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -74,9 +74,9 @@ public class TestFilters extends BspCase {
   }
 
   public static class VertexFilter implements VertexInputFilter<IntWritable,
-      NullWritable, NullWritable, NullWritable> {
+      NullWritable, NullWritable> {
     @Override
-    public boolean dropVertex(Vertex<IntWritable, NullWritable, NullWritable,
+    public boolean dropVertex(Vertex<IntWritable, NullWritable,
         NullWritable> vertex) {
       int id = vertex.getId().get();
       return id == 2 || id == 3;
@@ -93,7 +93,7 @@ public class TestFilters extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(IntIntNullVertexDoNothing.class);
+    conf.setComputationClass(IntNoOpComputation.class);
     conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
     conf.setVertexInputFilterClass(VertexFilter.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
index ff4d2b1..68059f9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
@@ -20,10 +20,12 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
+
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -38,24 +40,19 @@ public class TestIdWithValueTextOutputFormat extends
     IdWithValueTextOutputFormat<Text, DoubleWritable, Writable> {
   /** Test configuration */
   private ImmutableClassesGiraphConfiguration<
-      Text, DoubleWritable, Writable, Writable> conf;
+      Text, DoubleWritable, Writable> conf;
   /**
    * Dummy class to allow ImmutableClassesGiraphConfiguration to be created.
    */
-  public static class DummyVertex extends Vertex<Text, DoubleWritable,
-      DoubleWritable, DoubleWritable> {
-    @Override
-    public void compute(Iterable<DoubleWritable> messages) throws IOException {
-      // Do nothing
-    }
-  }
+  public static class DummyComputation extends NoOpComputation<Text,
+      DoubleWritable, DoubleWritable, DoubleWritable> { }
 
   @Before
   public void setUp() {
     GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
-    giraphConfiguration.setVertexClass(DummyVertex.class);
+    giraphConfiguration.setComputationClass(DummyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<Text, DoubleWritable,
-        Writable, Writable>(giraphConfiguration);
+        Writable>(giraphConfiguration);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
index a6aa78a..3b0dc63 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.BspCase;
-import org.apache.giraph.benchmark.WeightedPageRankVertex;
+import org.apache.giraph.benchmark.WeightedPageRankComputation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
@@ -61,7 +61,7 @@ public class TestJsonBase64Format extends BspCase {
 
     Path outputPath = getTempPath(getCallingMethodName());
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(WeightedPageRankVertex.class);
+    conf.setComputationClass(WeightedPageRankComputation.class);
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
@@ -69,31 +69,34 @@ public class TestJsonBase64Format extends BspCase {
         PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
         PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
-    job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 2);
+    job.getConfiguration().setInt(
+        WeightedPageRankComputation.SUPERSTEP_COUNT, 2);
 
     assertTrue(job.run(true));
 
     Path outputPath2 = getTempPath(getCallingMethodName() + "2");
     conf = new GiraphConfiguration();
-    conf.setVertexClass(WeightedPageRankVertex.class);
+    conf.setComputationClass(WeightedPageRankComputation.class);
     conf.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
     conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job = prepareJob(getCallingMethodName(), conf, outputPath2);
-    job.getConfiguration().setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 3);
+    job.getConfiguration().setInt(
+        WeightedPageRankComputation.SUPERSTEP_COUNT, 3);
     GiraphFileInputFormat.addVertexInputPath(
       job.getInternalJob().getConfiguration(), outputPath);
     assertTrue(job.run(true));
 
     Path outputPath3 = getTempPath(getCallingMethodName() + "3");
     conf = new GiraphConfiguration();
-    conf.setVertexClass(WeightedPageRankVertex.class);
+    conf.setComputationClass(WeightedPageRankComputation.class);
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     conf.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job = prepareJob(getCallingMethodName(), conf, outputPath3);
     conf = job.getConfiguration();
     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101);
     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2);
-    conf.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT, 5);
+    conf.setInt(
+        WeightedPageRankComputation.SUPERSTEP_COUNT, 5);
     assertTrue(job.run(true));
 
     assertEquals(101, getNumResults(conf, outputPath));

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
index 601c824..c67a1c4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -19,18 +19,19 @@ package org.apache.giraph.io;
 
 
 import java.io.IOException;
+
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
 import org.apache.giraph.io.formats.LongDoubleDoubleAdjacencyListVertexInputFormat;
+
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -38,7 +39,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.giraph.io.TestTextDoubleDoubleAdjacencyListVertexInputFormat.assertValidVertex;
-import static org.apache.giraph.io.TestTextDoubleDoubleAdjacencyListVertexInputFormat.setGraphState;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -47,19 +47,17 @@ public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends LongDoub
 
   private RecordReader<LongWritable, Text> rr;
   private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      DoubleWritable, Writable> conf;
+      DoubleWritable> conf;
   private TaskAttemptContext tac;
-  private GraphState<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> graphState;
 
   @Before
   public void setUp() throws IOException, InterruptedException {
     rr = mock(RecordReader.class);
     when(rr.nextKeyValue()).thenReturn(true);
     GiraphConfiguration giraphConf = new GiraphConfiguration();
-    giraphConf.setVertexClass(DummyVertex.class);
+    giraphConf.setComputationClass(DummyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-        DoubleWritable, Writable>(giraphConf);
-    graphState = mock(GraphState.class);
+        DoubleWritable>(giraphConf);
     tac = mock(TaskAttemptContext.class);
     when(tac.getConfiguration()).thenReturn(conf);
   }
@@ -129,10 +127,9 @@ public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends LongDoub
     vr.initialize(null, tac);
 
     assertTrue("Should have been able to read vertex", vr.nextVertex());
-    Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+    Vertex<LongWritable, DoubleWritable, DoubleWritable>
         vertex = vr.getCurrentVertex();
-    setGraphState(vertex, graphState);
-    assertValidVertex(conf, graphState, vertex,
+    assertValidVertex(conf, vertex,
         new LongWritable(42), new DoubleWritable(0.1),
         EdgeFactory.create(new LongWritable(99), new DoubleWritable(0.2)),
         EdgeFactory.create(new LongWritable(2000), new DoubleWritable(0.3)),
@@ -150,20 +147,14 @@ public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends LongDoub
     vr.setConf(conf);
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());
-    Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+    Vertex<LongWritable, DoubleWritable, DoubleWritable>
         vertex = vr.getCurrentVertex();
-    setGraphState(vertex, graphState);
-    assertValidVertex(conf, graphState, vertex,
+    assertValidVertex(conf, vertex,
         new LongWritable(12345), new DoubleWritable(42.42),
        EdgeFactory.create(new LongWritable(9999999), new DoubleWritable(99.9)));
     assertEquals(vertex.getNumEdges(), 1);
   }
 
-  public static class DummyVertex extends Vertex<LongWritable, DoubleWritable,
-      DoubleWritable, BooleanWritable> {
-    @Override
-    public void compute(Iterable<BooleanWritable> messages) throws IOException {
-      // ignore
-    }
-  }
+  public static class DummyComputation extends NoOpComputation<LongWritable,
+      DoubleWritable, DoubleWritable, BooleanWritable> { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
index 4ee8ea1..8034052 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -19,13 +19,13 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
 import org.apache.giraph.io.formats.TextDoubleDoubleAdjacencyListVertexInputFormat;
 import org.apache.giraph.utils.EdgeIterables;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -39,7 +39,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
@@ -48,23 +47,21 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
+public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoubleDoubleAdjacencyListVertexInputFormat {
 
   private RecordReader<LongWritable, Text> rr;
   private ImmutableClassesGiraphConfiguration<Text, DoubleWritable,
-      DoubleWritable, Writable> conf;
+      DoubleWritable> conf;
   private TaskAttemptContext tac;
-  private GraphState<Text, DoubleWritable, DoubleWritable, BooleanWritable> graphState;
 
   @Before
   public void setUp() throws IOException, InterruptedException {
     rr = mock(RecordReader.class);
     when(rr.nextKeyValue()).thenReturn(true).thenReturn(false);
     GiraphConfiguration giraphConf = new GiraphConfiguration();
-    giraphConf.setVertexClass(DummyVertex.class);
+    giraphConf.setComputationClass(DummyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<Text, DoubleWritable,
-        DoubleWritable, Writable>(giraphConf);
-    graphState = mock(GraphState.class);
+        DoubleWritable>(giraphConf);
     tac = mock(TaskAttemptContext.class);
     when(tac.getConfiguration()).thenReturn(conf);
   }
@@ -121,33 +118,23 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub
     }
   }
 
-  public static void setGraphState(Vertex vertex, GraphState graphState) throws Exception {
-    Class<? extends Vertex> c = Vertex.class;
-    Method m = c.getDeclaredMethod("setGraphState", GraphState.class);
-    m.setAccessible(true);
-    m.invoke(vertex, graphState);
-  }
-
   public static <I extends WritableComparable, V extends Writable,
       E extends WritableComparable> void assertValidVertex(
-      ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
-      GraphState<I, V, E, ?> graphState,
-      Vertex<I, V, E, ?> actual,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      Vertex<I, V, E> actual,
       I expectedId,
       V expectedValue,
       Edge<I, E>... edges) throws Exception {
-    Vertex<I, V, E, ?> expected = conf.createVertex();
-    setGraphState(expected, graphState);
+    Vertex<I, V, E> expected = conf.createVertex();
     expected.initialize(expectedId, expectedValue, Arrays.asList(edges));
     assertValid(expected, actual);
   }
 
   public static <I extends WritableComparable, V extends Writable,
       E extends WritableComparable> void assertValid(
-      Vertex<I, V, E, ?> expected, Vertex<I, V, E, ?> actual) {
+      Vertex<I, V, E> expected, Vertex<I, V, E> actual) {
     assertEquals(expected.getId(), actual.getId());
     assertEquals(expected.getValue(), actual.getValue());
-    assertEquals(expected.getTotalNumEdges(), actual.getTotalNumEdges());
     assertTrue(EdgeIterables.equals(expected.getEdges(), actual.getEdges()));
   }
 
@@ -160,9 +147,8 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub
     vr.setConf(conf);
     vr.initialize(null, tac);
     assertTrue("Should have been able to add a vertex", vr.nextVertex());
-    Vertex<Text, DoubleWritable, DoubleWritable, ?> vertex = vr.getCurrentVertex();
-    setGraphState(vertex, graphState);
-    assertValidVertex(conf, graphState, vertex,
+    Vertex<Text, DoubleWritable, DoubleWritable> vertex = vr.getCurrentVertex();
+    assertValidVertex(conf, vertex,
         new Text("Hi"), new DoubleWritable(0),
         EdgeFactory.create(new Text("Ciao"), new DoubleWritable(1.123d)),
         EdgeFactory.create(new Text("Bomdia"), new DoubleWritable(2.234d)),
@@ -187,9 +173,8 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub
     vr.setConf(conf);
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());
-    Vertex<Text, DoubleWritable, DoubleWritable, ?> vertex = vr.getCurrentVertex();
-    setGraphState(vertex, graphState);
-    assertValidVertex(conf, graphState, vertex,
+    Vertex<Text, DoubleWritable, DoubleWritable> vertex = vr.getCurrentVertex();
+    assertValidVertex(conf, vertex,
         new Text("BYE"), new DoubleWritable(0.01d),
         EdgeFactory.create(new Text("CIAO"), new DoubleWritable(1.001d)),
         EdgeFactory.create(new Text("TCHAU"), new DoubleWritable(2.0001d)),
@@ -208,19 +193,13 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TextDoub
     vr.setConf(conf);
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());
-    Vertex<Text, DoubleWritable, DoubleWritable, ?> vertex = vr.getCurrentVertex();
-    setGraphState(vertex, graphState);
-    assertValidVertex(conf, graphState, vertex,
+    Vertex<Text, DoubleWritable, DoubleWritable> vertex = vr.getCurrentVertex();
+    assertValidVertex(conf, vertex,
         new Text("alpha"), new DoubleWritable(42d),
         EdgeFactory.create(new Text("beta"), new DoubleWritable(99d)));
     assertEquals(vertex.getNumEdges(), 1);
   }
 
-  public static class DummyVertex extends Vertex<Text, DoubleWritable,
-            DoubleWritable, BooleanWritable> {
-    @Override
-    public void compute(Iterable<BooleanWritable> messages) throws IOException {
-      // ignore
-    }
-  }
+  public static class DummyComputation extends NoOpComputation<Text,
+      DoubleWritable, DoubleWritable, BooleanWritable> { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
new file mode 100644
index 0000000..8ae09bc
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.IntNoOpComputation;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/** Test type verification when switching computation and combiner types */
+public class TestComputationCombinerTypes {
+  @Test
+  public void testAllMatchWithoutCombiner() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntNoOpComputation.class, null);
+    classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class));
+  }
+
+  @Test
+  public void testAllMatchWithCombiner() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
+            IntDoubleCombiner.class);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentIdTypes() {
+    SuperstepClasses classes =
+        new SuperstepClasses(LongIntIntLongIntComputation.class, null);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentVertexValueTypes() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntLongIntLongIntComputation.class, null);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentEdgeDataTypes() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntIntLongLongIntComputation.class, null);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentMessageTypes() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntIntIntIntLongComputation.class, null);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntLongDoubleComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentCombinerIdType() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
+            DoubleDoubleCombiner.class);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDifferentCombinerMessageType() {
+    SuperstepClasses classes =
+        new SuperstepClasses(IntIntIntLongDoubleComputation.class,
+            IntLongCombiner.class);
+    classes.verifyTypesMatch(
+        createConfiguration(IntIntIntIntLongComputation.class));
+  }
+
+  private static ImmutableClassesGiraphConfiguration createConfiguration(
+      Class<? extends Computation> computationClass) {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(computationClass);
+    return new ImmutableClassesGiraphConfiguration(conf);
+  }
+
+  public static class NoOpComputation<I extends WritableComparable,
+      V extends Writable, E extends Writable, M1 extends Writable,
+      M2 extends Writable> extends Computation<I, V, E, M1, M2> {
+    @Override
+    public void compute(Vertex<I, V, E> vertex,
+        Iterable<M1> messages) throws IOException {
+    }
+  }
+
+  private static class IntIntIntIntLongComputation extends
+      NoOpComputation<IntWritable, IntWritable, IntWritable, IntWritable,
+          LongWritable> { }
+
+  private static class IntIntIntLongDoubleComputation extends
+      NoOpComputation<IntWritable, IntWritable, IntWritable, LongWritable,
+          DoubleWritable> { }
+
+  private static class LongIntIntLongIntComputation extends
+      NoOpComputation<LongWritable, IntWritable, IntWritable, LongWritable,
+          IntWritable> { }
+
+  private static class IntLongIntLongIntComputation extends
+      NoOpComputation<IntWritable, LongWritable, IntWritable, LongWritable,
+          IntWritable> { }
+
+  private static class IntIntLongLongIntComputation extends
+      NoOpComputation<IntWritable, IntWritable, LongWritable, LongWritable,
+          IntWritable> { }
+
+  private static class NoOpCombiner<I extends WritableComparable,
+      M extends Writable> extends Combiner<I, M> {
+    @Override
+    public void combine(I vertexIndex, M originalMessage, M messageToCombine) {
+    }
+
+    @Override
+    public M createInitialMessage() {
+      return null;
+    }
+  }
+
+  private static class IntDoubleCombiner extends NoOpCombiner<IntWritable,
+      DoubleWritable> { }
+
+  private static class DoubleDoubleCombiner extends NoOpCombiner<DoubleWritable,
+      DoubleWritable> { }
+
+  private static class IntLongCombiner extends NoOpCombiner<IntWritable,
+      LongWritable> { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
index 9d6b215..0461624 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestMasterObserver.java
@@ -18,14 +18,19 @@
 
 package org.apache.giraph.master;
 
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.formats.IntNullNullTextInputFormat;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -33,17 +38,45 @@ import java.io.IOException;
 import static org.apache.hadoop.util.StringUtils.arrayToString;
 import static org.junit.Assert.assertEquals;
 
-public class TestMasterObserver {
-  public static class NoOpVertex extends Vertex<IntWritable, NullWritable,
-      NullWritable, NullWritable> {
-    private int count = 0;
+import com.google.common.collect.ImmutableList;
 
+public class TestMasterObserver {
+  public static class SimpleComputation extends BasicComputation<IntWritable,
+      IntWritable, NullWritable, NullWritable> {
     @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      if (count == 2) {
-        voteToHalt();
+    public void compute(
+        Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<NullWritable> messages) throws IOException {
+      int currentValue = vertex.getValue().get();
+      if (currentValue == 2) {
+        vertex.voteToHalt();
       }
-      ++count;
+      vertex.setValue(new IntWritable(currentValue + 1));
+    }
+  }
+
+  public static class InputFormat extends TextVertexInputFormat<
+      IntWritable, IntWritable, NullWritable> {
+    @Override
+    public TextVertexReader createVertexReader(
+        InputSplit split, TaskAttemptContext context) throws IOException {
+      return new TextVertexReaderFromEachLine() {
+        @Override
+        protected IntWritable getId(Text line) throws IOException {
+          return new IntWritable(Integer.parseInt(line.toString()));
+        }
+
+        @Override
+        protected IntWritable getValue(Text line) throws IOException {
+          return new IntWritable(0);
+        }
+
+        @Override
+        protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+            Text line) throws IOException {
+          return ImmutableList.of();
+        }
+      };
     }
   }
 
@@ -88,9 +121,9 @@ public class TestMasterObserver {
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
         arrayToString(klasses));
-    conf.setVertexClass(NoOpVertex.class);
+    conf.setComputationClass(SimpleComputation.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setVertexInputFormatClass(IntNullNullTextInputFormat.class);
+    conf.setVertexInputFormatClass(InputFormat.class);
     InternalVertexRunner.run(conf, graph);
 
     assertEquals(2, Obs.preApp);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
new file mode 100644
index 0000000..4069972
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import junit.framework.Assert;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/** Test switching Computation and Combiner class during application */
+public class TestSwitchClasses {
+  @Test
+  public void testSwitchingClasses() throws Exception {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(Computation3.class);
+    conf.setMasterComputeClass(SwitchingClassesMasterCompute.class);
+
+    TestGraph<IntWritable, StatusValue, IntWritable> graph =
+        new TestGraph<IntWritable, StatusValue, IntWritable>(conf);
+    IntWritable id1 = new IntWritable(1);
+    graph.addVertex(id1, new StatusValue());
+    IntWritable id2 = new IntWritable(2);
+    graph.addVertex(id2, new StatusValue());
+    graph = InternalVertexRunner.run(conf, graph);
+
+    Assert.assertEquals(2, graph.getVertices().size());
+    StatusValue value1 = graph.getVertex(id1).getValue();
+    StatusValue value2 = graph.getVertex(id2).getValue();
+
+    // Check that computations were performed in expected order
+    ArrayList<Integer> expectedComputations = Lists.newArrayList(1, 1, 2, 3, 1);
+    checkComputations(expectedComputations, value1.computations);
+    checkComputations(expectedComputations, value2.computations);
+
+    // Check that messages were sent in the correct superstep,
+    // and combined when needed
+    ArrayList<HashSet<Double>> messages1 =
+        Lists.newArrayList(
+            Sets.<Double>newHashSet(),
+            Sets.<Double>newHashSet(11d),
+            Sets.<Double>newHashSet(11d),
+            Sets.<Double>newHashSet(101.5, 201.5),
+            Sets.<Double>newHashSet(3002d));
+    checkMessages(messages1, value1.messagesReceived);
+    ArrayList<HashSet<Double>> messages2 =
+        Lists.newArrayList(
+            Sets.<Double>newHashSet(),
+            Sets.<Double>newHashSet(12d),
+            Sets.<Double>newHashSet(12d),
+            Sets.<Double>newHashSet(102.5, 202.5),
+            Sets.<Double>newHashSet(3004d));
+    checkMessages(messages2, value2.messagesReceived);
+  }
+
+  private static void checkComputations(ArrayList<Integer> expected,
+      ArrayList<Integer> actual) {
+    Assert.assertEquals("Incorrect number of supersteps",
+        expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals("Incorrect computation on superstep " + i,
+          (int) expected.get(i), (int) actual.get(i));
+    }
+  }
+
+  private static void checkMessages(ArrayList<HashSet<Double>> expected,
+      ArrayList<HashSet<Double>> actual) {
+    Assert.assertEquals(expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals(expected.get(i).size(), actual.get(i).size());
+      for (Double value : expected.get(i)) {
+        Assert.assertTrue(actual.get(i).contains(value));
+      }
+    }
+  }
+
+  public static class SwitchingClassesMasterCompute
+      extends DefaultMasterCompute {
+    @Override
+    public void compute() {
+      switch ((int) getSuperstep()) {
+        case 0:
+          setComputation(Computation1.class);
+          setCombiner(MinimumCombiner.class);
+          break;
+        case 1:
+          // test classes don't change
+          break;
+        case 2:
+          setComputation(Computation2.class);
+          // test combiner removed
+          setCombiner(null);
+          break;
+        case 3:
+          setComputation(Computation3.class);
+          setCombiner(SumCombiner.class);
+          break;
+        case 4:
+          setComputation(Computation1.class);
+          break;
+        default:
+          haltComputation();
+      }
+    }
+  }
+
+  public static class Computation1 extends Computation<IntWritable,
+      StatusValue, IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Vertex<IntWritable, StatusValue, IntWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+      vertex.getValue().computations.add(1);
+      vertex.getValue().addIntMessages(messages);
+
+      IntWritable otherId = new IntWritable(3 - vertex.getId().get());
+      sendMessage(otherId, new IntWritable(otherId.get() + 10));
+      sendMessage(otherId, new IntWritable(otherId.get() + 20));
+    }
+  }
+
+  public static class Computation2 extends Computation<IntWritable,
+      StatusValue, IntWritable, IntWritable, DoubleWritable> {
+    @Override
+    public void compute(Vertex<IntWritable, StatusValue, IntWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+      vertex.getValue().computations.add(2);
+      vertex.getValue().addIntMessages(messages);
+
+      IntWritable otherId = new IntWritable(3 - vertex.getId().get());
+      sendMessage(otherId, new DoubleWritable(otherId.get() + 100.5));
+      sendMessage(otherId, new DoubleWritable(otherId.get() + 200.5));
+    }
+  }
+
+  public static class Computation3 extends Computation<IntWritable,
+      StatusValue, IntWritable, DoubleWritable, IntWritable> {
+    @Override
+    public void compute(
+        Vertex<IntWritable, StatusValue, IntWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
+      vertex.getValue().computations.add(3);
+      vertex.getValue().addDoubleMessages(messages);
+
+      IntWritable otherId = new IntWritable(3 - vertex.getId().get());
+      sendMessage(otherId, new IntWritable(otherId.get() + 1000));
+      sendMessage(otherId, new IntWritable(otherId.get() + 2000));
+    }
+  }
+
+  public static class MinimumCombiner extends Combiner<IntWritable,
+      IntWritable> {
+    @Override
+    public void combine(IntWritable vertexIndex, IntWritable originalMessage,
+        IntWritable messageToCombine) {
+      originalMessage.set(
+          Math.min(originalMessage.get(), messageToCombine.get()));
+    }
+
+    @Override
+    public IntWritable createInitialMessage() {
+      return new IntWritable(Integer.MAX_VALUE);
+    }
+  }
+
+  public static class SumCombiner extends Combiner<IntWritable, IntWritable> {
+    @Override
+    public void combine(IntWritable vertexIndex, IntWritable originalMessage,
+        IntWritable messageToCombine) {
+      originalMessage.set(originalMessage.get() + messageToCombine.get());
+    }
+
+    @Override
+    public IntWritable createInitialMessage() {
+      return new IntWritable(0);
+    }
+  }
+
+  public static class StatusValue implements Writable {
+    private ArrayList<Integer> computations = new ArrayList<Integer>();
+    private ArrayList<HashSet<Double>> messagesReceived =
+        new ArrayList<HashSet<Double>>();
+
+    public StatusValue() {
+    }
+
+    public void addIntMessages(Iterable<IntWritable> messages) {
+      HashSet<Double> messagesList = new HashSet<Double>();
+      for (IntWritable message : messages) {
+        messagesList.add((double) message.get());
+      }
+      messagesReceived.add(messagesList);
+    }
+
+    public void addDoubleMessages(Iterable<DoubleWritable> messages) {
+      HashSet<Double> messagesList = new HashSet<Double>();
+      for (DoubleWritable message : messages) {
+        messagesList.add(message.get());
+      }
+      messagesReceived.add(messagesList);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      dataOutput.writeInt(computations.size());
+      for (Integer computation : computations) {
+        dataOutput.writeInt(computation);
+      }
+      dataOutput.writeInt(messagesReceived.size());
+      for (HashSet<Double> messages : messagesReceived) {
+        dataOutput.writeInt(messages.size());
+        for (Double message : messages) {
+          dataOutput.writeDouble(message);
+        }
+      }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      int size = dataInput.readInt();
+      computations = new ArrayList<Integer>(size);
+      for (int i = 0; i < size; i++) {
+        computations.add(dataInput.readInt());
+      }
+      size = dataInput.readInt();
+      messagesReceived = new ArrayList<HashSet<Double>>(size);
+      for (int i = 0; i < size; i++) {
+        int size2 = dataInput.readInt();
+        HashSet<Double> messages = new HashSet<Double>(size2);
+        for (int j = 0; j < size2; j++) {
+          messages.add(dataInput.readDouble());
+        }
+        messagesReceived.add(messages);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
index 9b655af..8a1e9ed 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
@@ -23,6 +23,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.GiraphTransferRegulator;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -44,16 +45,14 @@ public class TestGiraphTransferRegulator {
   /** Job filled in by setup() */
   private GiraphJob job;
   /** Instantiated vertex filled in from setup() */
-  private TestVertex vertex = new TestVertex();
+  private Vertex<IntWritable, FloatWritable, DoubleWritable>
+      vertex = new Vertex<IntWritable, FloatWritable, DoubleWritable>();
 
   /**
    * Dummy vertex.
    */
-  public static class TestVertex extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable, LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException { }
-  }
+  public static class TestComputation extends NoOpComputation<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable> { }
 
   @Before
   public void setUp() {
@@ -62,7 +61,7 @@ public class TestGiraphTransferRegulator {
     } catch (IOException e) {
       throw new RuntimeException("setUp: Failed", e);
     }
-    job.getConfiguration().setVertexClass(TestVertex.class);
+    job.getConfiguration().setComputationClass(TestComputation.class);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 5a93d41..45542b9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -24,6 +24,7 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.hadoop.io.IntWritable;
@@ -48,25 +49,20 @@ import static org.mockito.Mockito.mock;
  */
 public class TestPartitionStores {
   private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
-      NullWritable, IntWritable> conf;
+      NullWritable> conf;
   private Mapper<?, ?, ?, ?>.Context context;
 
-  public static class MyVertex extends Vertex<IntWritable, IntWritable,
-      NullWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {}
-  }
+  public static class MyComputation extends NoOpComputation<IntWritable,
+      IntWritable, NullWritable, IntWritable> { }
 
-  private Partition<IntWritable, IntWritable, NullWritable,
-        IntWritable> createPartition(
+  private Partition<IntWritable, IntWritable, NullWritable> createPartition(
       ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
-          NullWritable, IntWritable> conf,
+          NullWritable> conf,
       Integer id,
-      Vertex<IntWritable, IntWritable, NullWritable,
-          IntWritable>... vertices) {
-    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
+      Vertex<IntWritable, IntWritable, NullWritable>... vertices) {
+    Partition<IntWritable, IntWritable, NullWritable> partition =
         conf.createPartition(id, context);
-    for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
+    for (Vertex<IntWritable, IntWritable, NullWritable> v :
         vertices) {
       partition.putVertex(v);
     }
@@ -76,17 +72,17 @@ public class TestPartitionStores {
   @Before
   public void setUp() {
     GiraphConfiguration configuration = new GiraphConfiguration();
-    configuration.setVertexClass(MyVertex.class);
+    configuration.setComputationClass(MyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
-        NullWritable, IntWritable>(configuration);
+        NullWritable>(configuration);
     context = mock(Mapper.Context.class);
   }
 
   @Test
   public void testSimplePartitionStore() {
-    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+    PartitionStore<IntWritable, IntWritable, NullWritable>
         partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
-                NullWritable, IntWritable>(conf, context);
+                NullWritable>(conf, context);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
   }
@@ -94,30 +90,23 @@ public class TestPartitionStores {
   @Test
   public void testUnsafePartitionSerializationClass() throws IOException {
     conf.setPartitionClass(ByteArrayPartition.class);
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+    Vertex<IntWritable, IntWritable, NullWritable> v1 =
         conf.createVertex();
     v1.initialize(new IntWritable(1), new IntWritable(1));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
     v2.initialize(new IntWritable(2), new IntWritable(2));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
     v3.initialize(new IntWritable(3), new IntWritable(3));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
     v4.initialize(new IntWritable(4), new IntWritable(4));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
     v5.initialize(new IntWritable(5), new IntWritable(5));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
     v6.initialize(new IntWritable(6), new IntWritable(6));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
     v7.initialize(new IntWritable(7), new IntWritable(7));
 
-    Partition<IntWritable, IntWritable, NullWritable,
-        IntWritable> partition =
+    Partition<IntWritable, IntWritable, NullWritable> partition =
         createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
     assertEquals(3, partition.getId());
     assertEquals(0, partition.getEdgeCount());
@@ -127,9 +116,8 @@ public class TestPartitionStores {
     partition.write(outputStream);
     UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
         outputStream.getByteArray(), 0, outputStream.getPos());
-    Partition<IntWritable, IntWritable, NullWritable,
-        IntWritable> deserializatedPartition = conf.createPartition(-1,
-        context);
+    Partition<IntWritable, IntWritable, NullWritable> deserializatedPartition =
+        conf.createPartition(-1, context);
     deserializatedPartition.readFields(inputStream);
 
     assertEquals(3, deserializatedPartition.getId());
@@ -146,9 +134,9 @@ public class TestPartitionStores {
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
     conf.setPartitionClass(ByteArrayPartition.class);
     
-    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
-        partitionStore = new DiskBackedPartitionStore<IntWritable,
-            IntWritable, NullWritable, IntWritable>(conf, context);
+    PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
+        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
+            conf, context);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
     FileUtils.deleteDirectory(directory);
@@ -162,15 +150,15 @@ public class TestPartitionStores {
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
 
-    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
-        partitionStore = new DiskBackedPartitionStore<IntWritable,
-                        IntWritable, NullWritable, IntWritable>(conf, context);
+    PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
+        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
+            conf, context);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
 
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2);
     partitionStore = new DiskBackedPartitionStore<IntWritable,
-            IntWritable, NullWritable, IntWritable>(conf, context);
+            IntWritable, NullWritable>(conf, context);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
     FileUtils.deleteDirectory(directory);
@@ -184,29 +172,22 @@ public class TestPartitionStores {
    */
   public void testReadWrite(
       PartitionStore<IntWritable, IntWritable,
-          NullWritable, IntWritable> partitionStore,
+          NullWritable> partitionStore,
       ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
-          NullWritable, IntWritable> conf) {
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
-        conf.createVertex();
+          NullWritable> conf) {
+    Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
     v1.initialize(new IntWritable(1), new IntWritable(1));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v2 = conf.createVertex();
     v2.initialize(new IntWritable(2), new IntWritable(2));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v3 = conf.createVertex();
     v3.initialize(new IntWritable(3), new IntWritable(3));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v4 = conf.createVertex();
     v4.initialize(new IntWritable(4), new IntWritable(4));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v5 = conf.createVertex();
     v5.initialize(new IntWritable(5), new IntWritable(5));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v6 = conf.createVertex();
     v6.initialize(new IntWritable(7), new IntWritable(7));
-    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
-        conf.createVertex();
+    Vertex<IntWritable, IntWritable, NullWritable> v7 = conf.createVertex();
     v7.initialize(new IntWritable(7), new IntWritable(7));
     v7.addEdge(EdgeFactory.create(new IntWritable(1)));
     v7.addEdge(EdgeFactory.create(new IntWritable(2)));
@@ -218,15 +199,15 @@ public class TestPartitionStores {
     partitionStore.addPartition(createPartition(conf, 1, v6));
     partitionStore.addPartition(createPartition(conf, 4, v7));
 
-    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
+    Partition<IntWritable, IntWritable, NullWritable> partition1 =
         partitionStore.getPartition(1);
     partitionStore.putPartition(partition1);
-    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
+    Partition<IntWritable, IntWritable, NullWritable> partition2 =
         partitionStore.getPartition(2);
     partitionStore.putPartition(partition2);
-    Partition<IntWritable, IntWritable, NullWritable,
-        IntWritable> partition3 = partitionStore.removePartition(3);
-    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
+    Partition<IntWritable, IntWritable, NullWritable> partition3 =
+        partitionStore.removePartition(3);
+    Partition<IntWritable, IntWritable, NullWritable> partition4 =
         partitionStore.getPartition(4);
     partitionStore.putPartition(partition4);
 
@@ -234,12 +215,12 @@ public class TestPartitionStores {
     assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
     int partitionsNumber = 0;
     for (Integer partitionId : partitionStore.getPartitionIds()) {
-      Partition<IntWritable, IntWritable, NullWritable, IntWritable> p = 
+      Partition<IntWritable, IntWritable, NullWritable> p =
           partitionStore.getPartition(partitionId);
       partitionStore.putPartition(p);
       partitionsNumber++;
     }
-    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition;
+    Partition<IntWritable, IntWritable, NullWritable> partition;
     assertEquals(3, partitionsNumber);
     assertTrue(partitionStore.hasPartition(1));
     assertTrue(partitionStore.hasPartition(2));

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/ComputationCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/ComputationCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/utils/ComputationCountEdges.java
new file mode 100644
index 0000000..ba05b66
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/ComputationCountEdges.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Computation which sets the value of vertex to its number of edges,
+ * used for testing. Vertex ids, values, edge values and messages are all
+ * integers.
+ */
+public class ComputationCountEdges extends BasicComputation<IntWritable,
+    IntWritable, NullWritable, NullWritable> {
+  @Override
+  public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+      Iterable<NullWritable> messages) throws IOException {
+    vertex.setValue(new IntWritable(vertex.getNumEdges()));
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/IntIntNullNoOpComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/IntIntNullNoOpComputation.java b/giraph-core/src/test/java/org/apache/giraph/utils/IntIntNullNoOpComputation.java
new file mode 100644
index 0000000..c628b0b
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/IntIntNullNoOpComputation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Computation which does nothing, used for testing. Vertex ids and values
+ * are integers, edge values and messages are nulls.
+ */
+public class IntIntNullNoOpComputation extends
+    NoOpComputation<IntWritable, IntWritable, NullWritable, NullWritable> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/IntNoOpComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/IntNoOpComputation.java b/giraph-core/src/test/java/org/apache/giraph/utils/IntNoOpComputation.java
new file mode 100644
index 0000000..e2fbb39
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/IntNoOpComputation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * Computation which does nothing, used for testing. Vertex ids, values,
+ * edge values and messages are all integers.
+ */
+public class IntNoOpComputation extends NoOpComputation<IntWritable,
+    IntWritable, IntWritable, IntWritable> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/LongNoOpComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/LongNoOpComputation.java b/giraph-core/src/test/java/org/apache/giraph/utils/LongNoOpComputation.java
new file mode 100644
index 0000000..c83348f
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/LongNoOpComputation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Computation which does nothing, used for testing. Vertex ids, values,
+ * edge values and messages are all longs.
+ */
+public class LongNoOpComputation extends NoOpComputation<LongWritable,
+    LongWritable, LongWritable, LongWritable> {
+}