You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:00 UTC

[04/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
new file mode 100644
index 0000000..984e079
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * An example that simply uses its id, value, and edges to compute new data
+ * every iteration to verify that checkpoint restarting works.  Fault injection
+ * can also test automated checkpoint restarts.
+ */
+public class SimpleCheckpoint implements Tool {
+  /** Which superstep to cause the worker to fail */
+  public static final int FAULTING_SUPERSTEP = 4;
+  /** Vertex id to fault on */
+  public static final long FAULTING_VERTEX_ID = 1;
+  /** Dynamically set number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "simpleCheckpointVertex.superstepCount";
+  /** Should fault? */
+  public static final String ENABLE_FAULT =
+      "simpleCheckpointVertex.enableFault";
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleCheckpoint.class);
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Actual computation.
+   */
+  public static class SimpleCheckpointComputation extends
+      BasicComputation<LongWritable, IntWritable, FloatWritable,
+          FloatWritable> {
+    @Override
+    public void compute(
+        Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+        Iterable<FloatWritable> messages) throws IOException {
+      SimpleCheckpointVertexWorkerContext workerContext = getWorkerContext();
+
+      boolean enableFault = workerContext.getEnableFault();
+      int supersteps = workerContext.getSupersteps();
+
+      if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+          (getContext().getTaskAttemptID().getId() == 0) &&
+          (vertex.getId().get() == FAULTING_VERTEX_ID)) {
+        LOG.info("compute: Forced a fault on the first " +
+            "attempt of superstep " +
+            FAULTING_SUPERSTEP + " and vertex id " +
+            FAULTING_VERTEX_ID);
+        System.exit(-1);
+      }
+      if (getSuperstep() > supersteps) {
+        vertex.voteToHalt();
+        return;
+      }
+      long sumAgg = this.<LongWritable>getAggregatedValue(
+          LongSumAggregator.class.getName()).get();
+      LOG.info("compute: " + sumAgg);
+      aggregate(LongSumAggregator.class.getName(),
+          new LongWritable(vertex.getId().get()));
+      LOG.info("compute: sum = " + sumAgg +
+          " for vertex " + vertex.getId());
+      float msgValue = 0.0f;
+      for (FloatWritable message : messages) {
+        float curMsgValue = message.get();
+        msgValue += curMsgValue;
+        LOG.info("compute: got msgValue = " + curMsgValue +
+            " for vertex " + vertex.getId() +
+            " on superstep " + getSuperstep());
+      }
+      int vertexValue = vertex.getValue().get();
+      vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
+      LOG.info("compute: vertex " + vertex.getId() +
+          " has value " + vertex.getValue() +
+          " on superstep " + getSuperstep());
+      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+        FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+            (float) vertexValue);
+        Edge<LongWritable, FloatWritable> newEdge =
+            EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
+        LOG.info("compute: vertex " + vertex.getId() +
+            " sending edgeValue " + edge.getValue() +
+            " vertexValue " + vertexValue +
+            " total " + newEdgeValue +
+            " to vertex " + edge.getTargetVertexId() +
+            " on superstep " + getSuperstep());
+        vertex.addEdge(newEdge);
+        sendMessage(edge.getTargetVertexId(), newEdgeValue);
+      }
+    }
+  }
+
+  /**
+   * Worker context associated with {@link SimpleCheckpoint}.
+   */
+  public static class SimpleCheckpointVertexWorkerContext
+      extends WorkerContext {
+    /** Filename to indicate whether a fault was found */
+    public static final String FAULT_FILE = "/tmp/faultFile";
+    /** User can access this after the application finishes if local */
+    private static long FINAL_SUM;
+    /** Number of supersteps to run (6 by default) */
+    private int supersteps = 6;
+    /** Enable the fault at the particular vertex id and superstep? */
+    private boolean enableFault = false;
+
+    public static long getFinalSum() {
+      return FINAL_SUM;
+    }
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      supersteps = getContext().getConfiguration()
+          .getInt(SUPERSTEP_COUNT, supersteps);
+      enableFault = getContext().getConfiguration()
+          .getBoolean(ENABLE_FAULT, false);
+    }
+
+    @Override
+    public void postApplication() {
+      FINAL_SUM = this.<LongWritable>getAggregatedValue(
+          LongSumAggregator.class.getName()).get();
+      LOG.info("FINAL_SUM=" + FINAL_SUM);
+    }
+
+    @Override
+    public void preSuperstep() {
+    }
+
+    @Override
+    public void postSuperstep() { }
+
+    public int getSupersteps() {
+      return this.supersteps;
+    }
+
+    public boolean getEnableFault() {
+      return this.enableFault;
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("s",
+        "supersteps",
+        true,
+        "Supersteps to execute before finishing");
+    options.addOption("w",
+        "workers",
+        true,
+        "Minimum number of workers");
+    options.addOption("o",
+        "outputDirectory",
+        true,
+        "Output directory");
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('o')) {
+      LOG.info("Need to set the output directory (-o)");
+      return -1;
+    }
+
+    GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+    bspJob.getConfiguration().setComputationClass(
+        SimpleCheckpointComputation.class);
+    bspJob.getConfiguration().setVertexInputFormatClass(
+        GeneratedVertexInputFormat.class);
+    bspJob.getConfiguration().setVertexOutputFormatClass(
+        IdWithValueTextOutputFormat.class);
+    bspJob.getConfiguration().setWorkerContextClass(
+        SimpleCheckpointVertexWorkerContext.class);
+    bspJob.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertexMasterCompute.class);
+    int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+    int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+    bspJob.getConfiguration().setWorkerConfiguration(
+        minWorkers, maxWorkers, 100.0f);
+
+    FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
+                                   new Path(cmd.getOptionValue('o')));
+    boolean verbose = false;
+    if (cmd.hasOption('v')) {
+      verbose = true;
+    }
+    if (cmd.hasOption('s')) {
+      getConf().setInt(SUPERSTEP_COUNT,
+          Integer.parseInt(cmd.getOptionValue('s')));
+    }
+    if (bspJob.run(verbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Master compute associated with {@link SimpleCheckpoint}.
+   * It registers required aggregators.
+   */
+  public static class SimpleCheckpointVertexMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(LongSumAggregator.class.getName(),
+          LongSumAggregator.class);
+    }
+  }
+
+  /**
+   * Executable from the command line.
+   *
+   * @param args Command line args.
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new SimpleCheckpoint(), args));
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
deleted file mode 100644
index 03d977b..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-
-/**
- * An example that simply uses its id, value, and edges to compute new data
- * every iteration to verify that checkpoint restarting works.  Fault injection
- * can also test automated checkpoint restarts.
- */
-public class SimpleCheckpointVertex implements Tool {
-  /** Which superstep to cause the worker to fail */
-  public static final int FAULTING_SUPERSTEP = 4;
-  /** Vertex id to fault on */
-  public static final long FAULTING_VERTEX_ID = 1;
-  /** Dynamically set number of supersteps */
-  public static final String SUPERSTEP_COUNT =
-      "simpleCheckpointVertex.superstepCount";
-  /** Should fault? */
-  public static final String ENABLE_FAULT =
-      "simpleCheckpointVertex.enableFault";
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SimpleCheckpointVertex.class);
-  /** Configuration */
-  private Configuration conf;
-
-  /**
-   * Actual computation.
-   */
-  public static class SimpleCheckpointComputation extends
-      Vertex<LongWritable, IntWritable, FloatWritable, FloatWritable> {
-    @Override
-    public void compute(Iterable<FloatWritable> messages) {
-      SimpleCheckpointVertexWorkerContext workerContext =
-          (SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
-      boolean enableFault = workerContext.getEnableFault();
-      int supersteps = workerContext.getSupersteps();
-
-      if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
-          (getContext().getTaskAttemptID().getId() == 0) &&
-          (getId().get() == FAULTING_VERTEX_ID)) {
-        LOG.info("compute: Forced a fault on the first " +
-            "attempt of superstep " +
-            FAULTING_SUPERSTEP + " and vertex id " +
-            FAULTING_VERTEX_ID);
-        System.exit(-1);
-      }
-      if (getSuperstep() > supersteps) {
-        voteToHalt();
-        return;
-      }
-      long sumAgg = this.<LongWritable>getAggregatedValue(
-          LongSumAggregator.class.getName()).get();
-      LOG.info("compute: " + sumAgg);
-      aggregate(LongSumAggregator.class.getName(),
-          new LongWritable(getId().get()));
-      LOG.info("compute: sum = " + sumAgg +
-          " for vertex " + getId());
-      float msgValue = 0.0f;
-      for (FloatWritable message : messages) {
-        float curMsgValue = message.get();
-        msgValue += curMsgValue;
-        LOG.info("compute: got msgValue = " + curMsgValue +
-            " for vertex " + getId() +
-            " on superstep " + getSuperstep());
-      }
-      int vertexValue = getValue().get();
-      setValue(new IntWritable(vertexValue + (int) msgValue));
-      LOG.info("compute: vertex " + getId() +
-          " has value " + getValue() +
-          " on superstep " + getSuperstep());
-      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
-        FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
-            (float) vertexValue);
-        Edge<LongWritable, FloatWritable> newEdge =
-            EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
-        LOG.info("compute: vertex " + getId() +
-            " sending edgeValue " + edge.getValue() +
-            " vertexValue " + vertexValue +
-            " total " + newEdgeValue +
-            " to vertex " + edge.getTargetVertexId() +
-            " on superstep " + getSuperstep());
-        addEdge(newEdge);
-        sendMessage(edge.getTargetVertexId(), newEdgeValue);
-      }
-    }
-  }
-
-  /**
-   * Worker context associated with {@link SimpleCheckpointVertex}.
-   */
-  public static class SimpleCheckpointVertexWorkerContext
-      extends WorkerContext {
-    /** Filename to indicate whether a fault was found */
-    public static final String FAULT_FILE = "/tmp/faultFile";
-    /** User can access this after the application finishes if local */
-    private static long FINAL_SUM;
-    /** Number of supersteps to run (6 by default) */
-    private int supersteps = 6;
-    /** Enable the fault at the particular vertex id and superstep? */
-    private boolean enableFault = false;
-
-    public static long getFinalSum() {
-      return FINAL_SUM;
-    }
-
-    @Override
-    public void preApplication()
-      throws InstantiationException, IllegalAccessException {
-      supersteps = getContext().getConfiguration()
-          .getInt(SUPERSTEP_COUNT, supersteps);
-      enableFault = getContext().getConfiguration()
-          .getBoolean(ENABLE_FAULT, false);
-    }
-
-    @Override
-    public void postApplication() {
-      FINAL_SUM = this.<LongWritable>getAggregatedValue(
-          LongSumAggregator.class.getName()).get();
-      LOG.info("FINAL_SUM=" + FINAL_SUM);
-    }
-
-    @Override
-    public void preSuperstep() {
-    }
-
-    @Override
-    public void postSuperstep() { }
-
-    public int getSupersteps() {
-      return this.supersteps;
-    }
-
-    public boolean getEnableFault() {
-      return this.enableFault;
-    }
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Options options = new Options();
-    options.addOption("h", "help", false, "Help");
-    options.addOption("v", "verbose", false, "Verbose");
-    options.addOption("w",
-        "workers",
-        true,
-        "Number of workers");
-    options.addOption("s",
-        "supersteps",
-        true,
-        "Supersteps to execute before finishing");
-    options.addOption("w",
-        "workers",
-        true,
-        "Minimum number of workers");
-    options.addOption("o",
-        "outputDirectory",
-        true,
-        "Output directory");
-    HelpFormatter formatter = new HelpFormatter();
-    if (args.length == 0) {
-      formatter.printHelp(getClass().getName(), options, true);
-      return 0;
-    }
-    CommandLineParser parser = new PosixParser();
-    CommandLine cmd = parser.parse(options, args);
-    if (cmd.hasOption('h')) {
-      formatter.printHelp(getClass().getName(), options, true);
-      return 0;
-    }
-    if (!cmd.hasOption('w')) {
-      LOG.info("Need to choose the number of workers (-w)");
-      return -1;
-    }
-    if (!cmd.hasOption('o')) {
-      LOG.info("Need to set the output directory (-o)");
-      return -1;
-    }
-
-    GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
-    bspJob.getConfiguration().setVertexClass(SimpleCheckpointComputation.class);
-    bspJob.getConfiguration().setVertexInputFormatClass(
-        GeneratedVertexInputFormat.class);
-    bspJob.getConfiguration().setVertexOutputFormatClass(
-        IdWithValueTextOutputFormat.class);
-    bspJob.getConfiguration().setWorkerContextClass(
-        SimpleCheckpointVertexWorkerContext.class);
-    bspJob.getConfiguration().setMasterComputeClass(
-        SimpleCheckpointVertexMasterCompute.class);
-    int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
-    int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
-    bspJob.getConfiguration().setWorkerConfiguration(
-        minWorkers, maxWorkers, 100.0f);
-
-    FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
-                                   new Path(cmd.getOptionValue('o')));
-    boolean verbose = false;
-    if (cmd.hasOption('v')) {
-      verbose = true;
-    }
-    if (cmd.hasOption('s')) {
-      getConf().setInt(SUPERSTEP_COUNT,
-          Integer.parseInt(cmd.getOptionValue('s')));
-    }
-    if (bspJob.run(verbose)) {
-      return 0;
-    } else {
-      return -1;
-    }
-  }
-
-  /**
-   * Master compute associated with {@link SimpleCheckpointVertex}.
-   * It registers required aggregators.
-   */
-  public static class SimpleCheckpointVertexMasterCompute extends
-      DefaultMasterCompute {
-    @Override
-    public void initialize() throws InstantiationException,
-        IllegalAccessException {
-      registerAggregator(LongSumAggregator.class.getName(),
-          LongSumAggregator.class);
-    }
-  }
-
-  /**
-   * Executable from the command line.
-   *
-   * @param args Command line args.
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
new file mode 100644
index 0000000..551a2a2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Test whether messages can go through a combiner.
+ */
+public class SimpleCombinerComputation extends
+    BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleCombinerComputation.class);
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+      Iterable<IntWritable> messages) throws IOException {
+    if (vertex.getId().equals(new LongWritable(2))) {
+      sendMessage(new LongWritable(1), new IntWritable(101));
+      sendMessage(new LongWritable(1), new IntWritable(102));
+      sendMessage(new LongWritable(1), new IntWritable(103));
+    }
+    if (!vertex.getId().equals(new LongWritable(1))) {
+      vertex.voteToHalt();
+    } else {
+      // Check the messages
+      int sum = 0;
+      int num = 0;
+      for (IntWritable message : messages) {
+        sum += message.get();
+        num++;
+      }
+      LOG.info("TestCombinerVertex: Received a sum of " + sum +
+          " (should have 306 with a single message value)");
+
+      if (num == 1 && sum == 306) {
+        vertex.voteToHalt();
+      }
+    }
+    if (getSuperstep() > 3) {
+      throw new IllegalStateException(
+          "TestCombinerVertex: Vertex 1 failed to receive " +
+          "messages in time");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
deleted file mode 100644
index f4475ae..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Test whether messages can go through a combiner.
- */
-public class SimpleCombinerVertex extends
-    Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
-
-  @Override
-  public void compute(Iterable<IntWritable> messages) {
-    if (getId().equals(new LongWritable(2))) {
-      sendMessage(new LongWritable(1), new IntWritable(101));
-      sendMessage(new LongWritable(1), new IntWritable(102));
-      sendMessage(new LongWritable(1), new IntWritable(103));
-    }
-    if (!getId().equals(new LongWritable(1))) {
-      voteToHalt();
-    } else {
-      // Check the messages
-      int sum = 0;
-      int num = 0;
-      for (IntWritable message : messages) {
-        sum += message.get();
-        num++;
-      }
-      LOG.info("TestCombinerVertex: Received a sum of " + sum +
-          " (should have 306 with a single message value)");
-
-      if (num == 1 && sum == 306) {
-        voteToHalt();
-      }
-    }
-    if (getSuperstep() > 3) {
-      throw new IllegalStateException(
-          "TestCombinerVertex: Vertex 1 failed to receive " +
-          "messages in time");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
new file mode 100644
index 0000000..1582902
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Vertex to allow unit testing of failure detection
+ */
+public class SimpleFailComputation extends BasicComputation<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleFailComputation.class);
+  /** TODO: Change this behavior to WorkerContext */
+  private static long SUPERSTEP = 0;
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() >= 1) {
+      double sum = 0;
+      for (DoubleWritable message : messages) {
+        sum += message.get();
+      }
+      DoubleWritable vertexValue =
+          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+      vertex.setValue(vertexValue);
+      if (getSuperstep() < 30) {
+        if (getSuperstep() == 20) {
+          if (vertex.getId().get() == 10L) {
+            try {
+              Thread.sleep(2000);
+            } catch (InterruptedException e) {
+              LOG.info("Sleep interrupted ", e);
+            }
+            System.exit(1);
+          } else if (getSuperstep() - SUPERSTEP > 10) {
+            return;
+          }
+        }
+        long edges = vertex.getNumEdges();
+        sendMessageToAllEdges(vertex,
+            new DoubleWritable(vertex.getValue().get() / edges));
+      } else {
+        vertex.voteToHalt();
+      }
+      SUPERSTEP = getSuperstep();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
deleted file mode 100644
index c0e206c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Vertex to allow unit testing of failure detection
- */
-public class SimpleFailVertex extends Vertex<
-    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(SimpleFailVertex.class);
-  /** TODO: Change this behavior to WorkerContext */
-  private static long SUPERSTEP = 0;
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    if (getSuperstep() >= 1) {
-      double sum = 0;
-      for (DoubleWritable message : messages) {
-        sum += message.get();
-      }
-      DoubleWritable vertexValue =
-          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
-      setValue(vertexValue);
-      if (getSuperstep() < 30) {
-        if (getSuperstep() == 20) {
-          if (getId().get() == 10L) {
-            try {
-              Thread.sleep(2000);
-            } catch (InterruptedException e) {
-              LOG.info("Sleep interrupted ", e);
-            }
-            System.exit(1);
-          } else if (getSuperstep() - SUPERSTEP > 10) {
-            return;
-          }
-        }
-        long edges = getNumEdges();
-        sendMessageToAllEdges(
-            new DoubleWritable(getValue().get() / edges));
-      } else {
-        voteToHalt();
-      }
-      SUPERSTEP = getSuperstep();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
new file mode 100644
index 0000000..1513401
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Simple function to return the out degree for each vertex.
+ */
+@Algorithm(
+    name = "Indegree Count"
+)
+public class SimpleInDegreeCountComputation extends BasicComputation<
+  LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, LongWritable, DoubleWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      Iterable<Edge<LongWritable, DoubleWritable>> edges = vertex.getEdges();
+      for (Edge<LongWritable, DoubleWritable> edge : edges) {
+        sendMessage(edge.getTargetVertexId(), new DoubleWritable(1.0));
+      }
+    } else {
+      long sum = 0;
+      for (DoubleWritable message : messages) {
+        sum++;
+      }
+      LongWritable vertexValue = vertex.getValue();
+      vertexValue.set(sum);
+      vertex.setValue(vertexValue);
+      vertex.voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
deleted file mode 100644
index 3bcf3f5..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Simple function to return the out degree for each vertex.
- */
-@Algorithm(
-    name = "Indegree Count"
-)
-public class SimpleInDegreeCountVertex extends Vertex<
-  LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    if (getSuperstep() == 0) {
-      Iterable<Edge<LongWritable, DoubleWritable>> edges = getEdges();
-      for (Edge<LongWritable, DoubleWritable> edge : edges) {
-        sendMessage(edge.getTargetVertexId(), new DoubleWritable(1.0));
-      }
-    } else {
-      long sum = 0;
-      for (DoubleWritable message : messages) {
-        sum++;
-      }
-      LongWritable vertexValue = getValue();
-      vertexValue.set(sum);
-      setValue(vertexValue);
-      voteToHalt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
new file mode 100644
index 0000000..96e0403
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A simple use of the Identity Vertex for taking care of Long, Double,
+ * Double, Double type Inputformat Good for use with
+ * io.LongDoubleDoubleAdjacencyListVertexInputFormat
+ */
+
+public abstract class SimpleLongDoubleDoubleDoubleIdentityComputation extends
+  IdentityComputation<LongWritable, DoubleWritable,
+  DoubleWritable, DoubleWritable> { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
deleted file mode 100644
index c7349d1..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * A simple use of the Identity Vertex for taking care of Long, Double,
- * Double, Double type Inputformat Good for use with
- * io.LongDoubleDoubleAdjacencyListVertexInputFormat
- */
-
-public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends
-  IdentityVertex<LongWritable, DoubleWritable,
-  DoubleWritable, DoubleWritable> { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
new file mode 100644
index 0000000..b9b2373
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Demonstrates a computation with a centralized part implemented via a
+ * MasterCompute.
+ */
+public class SimpleMasterComputeComputation extends BasicComputation<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Aggregator to get values from the master to the workers */
+  public static final String SMC_AGG = "simplemastercompute.aggregator";
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleMasterComputeComputation.class);
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
+    double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
+    double newSum = oldSum + newValue;
+    vertex.setValue(new DoubleWritable(newSum));
+    SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
+    workerContext.setFinalSum(newSum);
+    LOG.info("Current sum: " + newSum);
+  }
+
+  /**
+   * Worker context used with {@link SimpleMasterComputeComputation}.
+   */
+  public static class SimpleMasterComputeWorkerContext
+      extends WorkerContext {
+    /** Final sum value for verification for local jobs */
+    private static double FINAL_SUM;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+    }
+
+    @Override
+    public void preSuperstep() {
+    }
+
+    @Override
+    public void postSuperstep() {
+    }
+
+    @Override
+    public void postApplication() {
+    }
+
+    public void setFinalSum(double sum) {
+      FINAL_SUM = sum;
+    }
+
+    public static double getFinalSum() {
+      return FINAL_SUM;
+    }
+  }
+
+  /**
+   * MasterCompute used with {@link SimpleMasterComputeComputation}.
+   */
+  public static class SimpleMasterCompute
+      extends DefaultMasterCompute {
+    @Override
+    public void compute() {
+      setAggregatedValue(SMC_AGG,
+          new DoubleWritable(((double) getSuperstep()) / 2 + 1));
+      if (getSuperstep() == 10) {
+        haltComputation();
+      }
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
deleted file mode 100644
index 8a21ad7..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates a computation with a centralized part implemented via a
- * MasterCompute.
- */
-public class SimpleMasterComputeVertex extends Vertex<LongWritable,
-    DoubleWritable, FloatWritable, DoubleWritable> {
-  /** Aggregator to get values from the master to the workers */
-  public static final String SMC_AGG = "simplemastercompute.aggregator";
-  /** Logger */
-  private static final Logger LOG =
-      Logger.getLogger(SimpleMasterComputeVertex.class);
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
-    double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
-    double newSum = oldSum + newValue;
-    setValue(new DoubleWritable(newSum));
-    SimpleMasterComputeWorkerContext workerContext =
-        (SimpleMasterComputeWorkerContext) getWorkerContext();
-    workerContext.setFinalSum(newSum);
-    LOG.info("Current sum: " + newSum);
-  }
-
-  /**
-   * Worker context used with {@link SimpleMasterComputeVertex}.
-   */
-  public static class SimpleMasterComputeWorkerContext
-      extends WorkerContext {
-    /** Final sum value for verification for local jobs */
-    private static double FINAL_SUM;
-
-    @Override
-    public void preApplication()
-      throws InstantiationException, IllegalAccessException {
-    }
-
-    @Override
-    public void preSuperstep() {
-    }
-
-    @Override
-    public void postSuperstep() {
-    }
-
-    @Override
-    public void postApplication() {
-    }
-
-    public void setFinalSum(double sum) {
-      FINAL_SUM = sum;
-    }
-
-    public static double getFinalSum() {
-      return FINAL_SUM;
-    }
-  }
-
-  /**
-   * MasterCompute used with {@link SimpleMasterComputeVertex}.
-   */
-  public static class SimpleMasterCompute
-      extends DefaultMasterCompute {
-    @Override
-    public void compute() {
-      setAggregatedValue(SMC_AGG,
-          new DoubleWritable(((double) getSuperstep()) / 2 + 1));
-      if (getSuperstep() == 10) {
-        haltComputation();
-      }
-    }
-
-    @Override
-    public void initialize() throws InstantiationException,
-        IllegalAccessException {
-      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
new file mode 100644
index 0000000..75e69ab
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Test whether messages can be sent and received by vertices.
+ */
+public class SimpleMsgComputation extends
+    BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleMsgComputation.class);
+  @Override
+  public void compute(
+      Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+      Iterable<IntWritable> messages) throws IOException {
+    if (vertex.getId().equals(new LongWritable(2))) {
+      sendMessage(new LongWritable(1), new IntWritable(101));
+      sendMessage(new LongWritable(1), new IntWritable(102));
+      sendMessage(new LongWritable(1), new IntWritable(103));
+    }
+    if (!vertex.getId().equals(new LongWritable(1))) {
+      vertex.voteToHalt();
+    } else {
+      /* Check the messages */
+      int sum = 0;
+      for (IntWritable message : messages) {
+        sum += message.get();
+      }
+      LOG.info("compute: Received a sum of " + sum +
+          " (will stop on 306)");
+
+      if (sum == 306) {
+        vertex.voteToHalt();
+      }
+    }
+    if (getSuperstep() > 3) {
+      System.err.println("compute: Vertex 1 failed to receive " +
+          "messages in time");
+      vertex.voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
deleted file mode 100644
index 024fe9d..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Test whether messages can be sent and received by vertices.
- */
-public class SimpleMsgVertex extends
-    Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
-  /** Class logger */
-  private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
-  @Override
-  public void compute(Iterable<IntWritable> messages) {
-    if (getId().equals(new LongWritable(2))) {
-      sendMessage(new LongWritable(1), new IntWritable(101));
-      sendMessage(new LongWritable(1), new IntWritable(102));
-      sendMessage(new LongWritable(1), new IntWritable(103));
-    }
-    if (!getId().equals(new LongWritable(1))) {
-      voteToHalt();
-    } else {
-      /* Check the messages */
-      int sum = 0;
-      for (IntWritable message : messages) {
-        sum += message.get();
-      }
-      LOG.info("compute: Received a sum of " + sum +
-          " (will stop on 306)");
-
-      if (sum == 306) {
-        voteToHalt();
-      }
-    }
-    if (getSuperstep() > 3) {
-      System.err.println("compute: Vertex 1 failed to receive " +
-          "messages in time");
-      voteToHalt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
new file mode 100644
index 0000000..fc3b6ad
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Vertex to allow unit testing of graph mutations.
+ */
+public class SimpleMutateGraphComputation extends BasicComputation<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Class logger */
+  private static Logger LOG =
+      Logger.getLogger(SimpleMutateGraphComputation.class);
+  /** Maximum number of ranges for vertex ids */
+  private long maxRanges = 100;
+
+
+  /**
+   * Unless we create a ridiculous number of vertices , we should not
+   * collide within a vertex range defined by this method.
+   *
+   * @param range Range index
+   * @return Starting vertex id of the range
+   */
+  private long rangeVertexIdStart(int range) {
+    return (Long.MAX_VALUE / maxRanges) * range;
+  }
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    SimpleMutateGraphVertexWorkerContext workerContext = getWorkerContext();
+    if (getSuperstep() == 0) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 1) {
+      // Send messages to vertices that are sure not to exist
+      // (creating them)
+      LongWritable destVertexId =
+          new LongWritable(rangeVertexIdStart(1) + vertex.getId().get());
+      sendMessage(destVertexId, new DoubleWritable(0.0));
+    } else if (getSuperstep() == 2) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 3) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount * 2 != getTotalNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumVertices() +
+            " vertices when should have " + vertexCount * 2 +
+            " on superstep " + getSuperstep());
+      }
+      long edgeCount = workerContext.getEdgeCount();
+      if (edgeCount != getTotalNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumEdges() +
+            " edges when should have " + edgeCount +
+            " on superstep " + getSuperstep());
+      }
+      // Create vertices that are sure not to exist (doubling vertices)
+      LongWritable vertexIndex =
+          new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
+      addVertexRequest(vertexIndex, new DoubleWritable(0.0));
+      // Add edges to those remote vertices as well
+      addEdgeRequest(vertexIndex,
+          EdgeFactory.create(vertex.getId(), new FloatWritable(0.0f)));
+    } else if (getSuperstep() == 4) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 5) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount * 2 != getTotalNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumVertices() +
+            " when should have " + vertexCount * 2 +
+            " on superstep " + getSuperstep());
+      }
+      long edgeCount = workerContext.getEdgeCount();
+      if (edgeCount + vertexCount != getTotalNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumEdges() +
+            " edges when should have " + edgeCount + vertexCount +
+            " on superstep " + getSuperstep());
+      }
+      // Remove the edges created in superstep 3
+      LongWritable vertexIndex =
+          new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
+      workerContext.increaseEdgesRemoved();
+      removeEdgesRequest(vertexIndex, vertex.getId());
+    } else if (getSuperstep() == 6) {
+      // Remove all the vertices created in superstep 3
+      if (vertex.getId().compareTo(
+          new LongWritable(rangeVertexIdStart(3))) >= 0) {
+        removeVertexRequest(vertex.getId());
+      }
+    } else if (getSuperstep() == 7) {
+      long origEdgeCount = workerContext.getOrigEdgeCount();
+      if (origEdgeCount != getTotalNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumEdges() +
+            " edges when should have " + origEdgeCount +
+            " on superstep " + getSuperstep());
+      }
+    } else if (getSuperstep() == 8) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount / 2 != getTotalNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumVertices() +
+            " vertices when should have " + vertexCount / 2 +
+            " on superstep " + getSuperstep());
+      }
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Worker context used with {@link SimpleMutateGraphComputation}.
+   */
+  public static class SimpleMutateGraphVertexWorkerContext
+      extends WorkerContext {
+    /** Cached vertex count */
+    private long vertexCount;
+    /** Cached edge count */
+    private long edgeCount;
+    /** Original number of edges */
+    private long origEdgeCount;
+    /** Number of edges removed during superstep */
+    private int edgesRemoved = 0;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException { }
+
+    @Override
+    public void postApplication() { }
+
+    @Override
+    public void preSuperstep() { }
+
+    @Override
+    public void postSuperstep() {
+      vertexCount = getTotalNumVertices();
+      edgeCount = getTotalNumEdges();
+      if (getSuperstep() == 1) {
+        origEdgeCount = edgeCount;
+      }
+      LOG.info("Got " + vertexCount + " vertices, " +
+          edgeCount + " edges on superstep " +
+          getSuperstep());
+      LOG.info("Removed " + edgesRemoved);
+      edgesRemoved = 0;
+    }
+
+    public long getVertexCount() {
+      return vertexCount;
+    }
+
+    public long getEdgeCount() {
+      return edgeCount;
+    }
+
+    public long getOrigEdgeCount() {
+      return origEdgeCount;
+    }
+
+    /**
+     * Increase the number of edges removed by one.
+     */
+    public void increaseEdgesRemoved() {
+      this.edgesRemoved++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
deleted file mode 100644
index a468491..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-
-/**
- * Vertex to allow unit testing of graph mutations.
- */
-public class SimpleMutateGraphVertex extends Vertex<
-    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-  /** Class logger */
-  private static Logger LOG =
-      Logger.getLogger(SimpleMutateGraphVertex.class);
-  /** Maximum number of ranges for vertex ids */
-  private long maxRanges = 100;
-
-
-  /**
-   * Unless we create a ridiculous number of vertices , we should not
-   * collide within a vertex range defined by this method.
-   *
-   * @param range Range index
-   * @return Starting vertex id of the range
-   */
-  private long rangeVertexIdStart(int range) {
-    return (Long.MAX_VALUE / maxRanges) * range;
-  }
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages)
-    throws IOException {
-    SimpleMutateGraphVertexWorkerContext workerContext =
-        (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
-    if (getSuperstep() == 0) {
-      LOG.debug("Reached superstep " + getSuperstep());
-    } else if (getSuperstep() == 1) {
-      // Send messages to vertices that are sure not to exist
-      // (creating them)
-      LongWritable destVertexId =
-          new LongWritable(rangeVertexIdStart(1) + getId().get());
-      sendMessage(destVertexId, new DoubleWritable(0.0));
-    } else if (getSuperstep() == 2) {
-      LOG.debug("Reached superstep " + getSuperstep());
-    } else if (getSuperstep() == 3) {
-      long vertexCount = workerContext.getVertexCount();
-      if (vertexCount * 2 != getTotalNumVertices()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumVertices() +
-            " vertices when should have " + vertexCount * 2 +
-            " on superstep " + getSuperstep());
-      }
-      long edgeCount = workerContext.getEdgeCount();
-      if (edgeCount != getTotalNumEdges()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumEdges() +
-            " edges when should have " + edgeCount +
-            " on superstep " + getSuperstep());
-      }
-      // Create vertices that are sure not to exist (doubling vertices)
-      LongWritable vertexIndex =
-          new LongWritable(rangeVertexIdStart(3) + getId().get());
-      addVertexRequest(vertexIndex, new DoubleWritable(0.0));
-      // Add edges to those remote vertices as well
-      addEdgeRequest(vertexIndex,
-          EdgeFactory.create(getId(), new FloatWritable(0.0f)));
-    } else if (getSuperstep() == 4) {
-      LOG.debug("Reached superstep " + getSuperstep());
-    } else if (getSuperstep() == 5) {
-      long vertexCount = workerContext.getVertexCount();
-      if (vertexCount * 2 != getTotalNumVertices()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumVertices() +
-            " when should have " + vertexCount * 2 +
-            " on superstep " + getSuperstep());
-      }
-      long edgeCount = workerContext.getEdgeCount();
-      if (edgeCount + vertexCount != getTotalNumEdges()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumEdges() +
-            " edges when should have " + edgeCount + vertexCount +
-            " on superstep " + getSuperstep());
-      }
-      // Remove the edges created in superstep 3
-      LongWritable vertexIndex =
-          new LongWritable(rangeVertexIdStart(3) + getId().get());
-      workerContext.increaseEdgesRemoved();
-      removeEdgesRequest(vertexIndex, getId());
-    } else if (getSuperstep() == 6) {
-      // Remove all the vertices created in superstep 3
-      if (getId().compareTo(
-          new LongWritable(rangeVertexIdStart(3))) >= 0) {
-        removeVertexRequest(getId());
-      }
-    } else if (getSuperstep() == 7) {
-      long origEdgeCount = workerContext.getOrigEdgeCount();
-      if (origEdgeCount != getTotalNumEdges()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumEdges() +
-            " edges when should have " + origEdgeCount +
-            " on superstep " + getSuperstep());
-      }
-    } else if (getSuperstep() == 8) {
-      long vertexCount = workerContext.getVertexCount();
-      if (vertexCount / 2 != getTotalNumVertices()) {
-        throw new IllegalStateException(
-            "Impossible to have " + getTotalNumVertices() +
-            " vertices when should have " + vertexCount / 2 +
-            " on superstep " + getSuperstep());
-      }
-    } else {
-      voteToHalt();
-    }
-  }
-
-  /**
-   * Worker context used with {@link SimpleMutateGraphVertex}.
-   */
-  public static class SimpleMutateGraphVertexWorkerContext
-      extends WorkerContext {
-    /** Cached vertex count */
-    private long vertexCount;
-    /** Cached edge count */
-    private long edgeCount;
-    /** Original number of edges */
-    private long origEdgeCount;
-    /** Number of edges removed during superstep */
-    private int edgesRemoved = 0;
-
-    @Override
-    public void preApplication()
-      throws InstantiationException, IllegalAccessException { }
-
-    @Override
-    public void postApplication() { }
-
-    @Override
-    public void preSuperstep() { }
-
-    @Override
-    public void postSuperstep() {
-      vertexCount = getTotalNumVertices();
-      edgeCount = getTotalNumEdges();
-      if (getSuperstep() == 1) {
-        origEdgeCount = edgeCount;
-      }
-      LOG.info("Got " + vertexCount + " vertices, " +
-          edgeCount + " edges on superstep " +
-          getSuperstep());
-      LOG.info("Removed " + edgesRemoved);
-      edgesRemoved = 0;
-    }
-
-    public long getVertexCount() {
-      return vertexCount;
-    }
-
-    public long getEdgeCount() {
-      return edgeCount;
-    }
-
-    public long getOrigEdgeCount() {
-      return origEdgeCount;
-    }
-
-    /**
-     * Increase the number of edges removed by one.
-     */
-    public void increaseEdgesRemoved() {
-      this.edgesRemoved++;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
new file mode 100644
index 0000000..36dc727
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.giraph.graph.Vertex;
+
+import java.io.IOException;
+
+/**
+ * Simple function to return the out degree for each vertex.
+ */
+@Algorithm(
+    name = "Outdegree Count"
+)
+public class SimpleOutDegreeCountComputation extends BasicComputation<
+  LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, LongWritable, DoubleWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    LongWritable vertexValue = vertex.getValue();
+    vertexValue.set(vertex.getNumEdges());
+    vertex.setValue(vertexValue);
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
deleted file mode 100644
index c830fa2..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Vertex;
-
-
-/**
- * Simple function to return the out degree for each vertex.
- */
-@Algorithm(
-    name = "Outdegree Count"
-)
-public class SimpleOutDegreeCountVertex extends Vertex<
-  LongWritable, LongWritable,
-  DoubleWritable, DoubleWritable> {
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) {
-    LongWritable vertexValue = getValue();
-    vertexValue.set(getNumEdges());
-    setValue(vertexValue);
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
new file mode 100644
index 0000000..d053bb3
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.giraph.aggregators.DoubleMaxAggregator;
+import org.apache.giraph.aggregators.DoubleMinAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+@Algorithm(
+    name = "Page rank"
+)
+public class SimplePageRankComputation extends BasicComputation<LongWritable,
+    DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Number of supersteps for this test */
+  public static final int MAX_SUPERSTEPS = 30;
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimplePageRankComputation.class);
+  /** Sum aggregator name */
+  private static String SUM_AGG = "sum";
+  /** Min aggregator name */
+  private static String MIN_AGG = "min";
+  /** Max aggregator name */
+  private static String MAX_AGG = "max";
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() >= 1) {
+      double sum = 0;
+      for (DoubleWritable message : messages) {
+        sum += message.get();
+      }
+      DoubleWritable vertexValue =
+          new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+      vertex.setValue(vertexValue);
+      aggregate(MAX_AGG, vertexValue);
+      aggregate(MIN_AGG, vertexValue);
+      aggregate(SUM_AGG, new LongWritable(1));
+      LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
+          " max=" + getAggregatedValue(MAX_AGG) +
+          " min=" + getAggregatedValue(MIN_AGG));
+    }
+
+    if (getSuperstep() < MAX_SUPERSTEPS) {
+      long edges = vertex.getNumEdges();
+      sendMessageToAllEdges(vertex,
+          new DoubleWritable(vertex.getValue().get() / edges));
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Worker context used with {@link SimplePageRankComputation}.
+   */
+  public static class SimplePageRankWorkerContext extends
+      WorkerContext {
+    /** Final max value for verification for local jobs */
+    private static double FINAL_MAX;
+    /** Final min value for verification for local jobs */
+    private static double FINAL_MIN;
+    /** Final sum value for verification for local jobs */
+    private static long FINAL_SUM;
+
+    public static double getFinalMax() {
+      return FINAL_MAX;
+    }
+
+    public static double getFinalMin() {
+      return FINAL_MIN;
+    }
+
+    public static long getFinalSum() {
+      return FINAL_SUM;
+    }
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+    }
+
+    @Override
+    public void postApplication() {
+      FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+      FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
+      FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
+
+      LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+      LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+      LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
+    }
+
+    @Override
+    public void preSuperstep() {
+      if (getSuperstep() >= 3) {
+        LOG.info("aggregatedNumVertices=" +
+            getAggregatedValue(SUM_AGG) +
+            " NumVertices=" + getTotalNumVertices());
+        if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+            getTotalNumVertices()) {
+          throw new RuntimeException("wrong value of SumAggreg: " +
+              getAggregatedValue(SUM_AGG) + ", should be: " +
+              getTotalNumVertices());
+        }
+        DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
+        LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+        DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
+        LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+      }
+    }
+
+    @Override
+    public void postSuperstep() { }
+  }
+
+  /**
+   * Master compute associated with {@link SimplePageRankComputation}.
+   * It registers required aggregators.
+   */
+  public static class SimplePageRankMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(SUM_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
+      registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
+    }
+  }
+
+  /**
+   * Simple VertexReader that supports {@link SimplePageRankComputation}
+   */
+  public static class SimplePageRankVertexReader extends
+      GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimplePageRankVertexReader.class);
+
+    @Override
+    public boolean nextVertex() {
+      return totalRecords > recordsRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable,
+        FloatWritable> getCurrentVertex() throws IOException {
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
+          getConf().createVertex();
+      LongWritable vertexId = new LongWritable(
+          (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+      long targetVertexId =
+          (vertexId.get() + 1) %
+          (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+          new FloatWritable(edgeValue)));
+      vertex.initialize(vertexId, vertexValue, edges);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next: Return vertexId=" + vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+      }
+      return vertex;
+    }
+  }
+
+  /**
+   * Simple VertexInputFormat that supports {@link SimplePageRankComputation}
+   */
+  public static class SimplePageRankVertexInputFormat extends
+    GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
+    @Override
+    public VertexReader<LongWritable, DoubleWritable,
+    FloatWritable> createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+      throws IOException {
+      return new SimplePageRankVertexReader();
+    }
+  }
+
+  /**
+   * Simple VertexOutputFormat that supports {@link SimplePageRankComputation}
+   */
+  public static class SimplePageRankVertexOutputFormat extends
+      TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+    @Override
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return new SimplePageRankVertexWriter();
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link SimplePageRankComputation}
+     */
+    public class SimplePageRankVertexWriter extends TextVertexWriter {
+      @Override
+      public void writeVertex(
+          Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
+        throws IOException, InterruptedException {
+        getRecordWriter().write(
+            new Text(vertex.getId().toString()),
+            new Text(vertex.getValue().toString()));
+      }
+    }
+  }
+}