You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/06/19 19:20:41 UTC

git commit: updated refs/heads/trunk to 1eaddd1

Updated Branches:
  refs/heads/trunk 0d358c9d5 -> 1eaddd183


GIRAPH-673: Input superstep should support aggregators like any
other superstep (Bingjing via aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1eaddd18
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1eaddd18
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1eaddd18

Branch: refs/heads/trunk
Commit: 1eaddd183259047476893a014fb24e44fc549531
Parents: 0d358c9
Author: Avery Ching <ac...@fb.com>
Authored: Wed Jun 19 10:04:28 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Wed Jun 19 10:04:42 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +
 .../java/org/apache/giraph/io/EdgeReader.java   |  32 +++-
 .../java/org/apache/giraph/io/VertexReader.java |  32 +++-
 .../giraph/io/internal/WrappedEdgeReader.java   |   7 +
 .../giraph/io/internal/WrappedVertexReader.java |   7 +
 .../apache/giraph/master/BspServiceMaster.java  |  91 +++++++----
 .../apache/giraph/worker/BspServiceWorker.java  |  12 +-
 .../giraph/worker/EdgeInputSplitsCallable.java  |   9 ++
 .../worker/VertexInputSplitsCallable.java       |   7 +
 .../examples/AggregatorsTestComputation.java    | 154 +++++++++++++++++++
 .../giraph/examples/GeneratedEdgeReader.java    |  73 +++++++++
 .../aggregators/TestAggregatorsHandling.java    |  13 +-
 12 files changed, 400 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 50d411a..76f7e6b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-673: Input superstep should support aggregators like any 
+  other superstep (Bingjing via aching)
+
   GIRAPH-686: DiskBackedPartitionStore does not saveVertex after edges 
   are loaded (claudio)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 363a5e6..1bc48e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io;
 import java.io.IOException;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.edge.Edge;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,7 +37,11 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 @SuppressWarnings("rawtypes")
 public abstract class EdgeReader<I extends WritableComparable,
     E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
-        I, Writable, E> {
+        I, Writable, E> implements WorkerAggregatorUsage {
+
+  /** Aggregator usage for edge reader */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+
   /**
    * Use the input split and context to setup reading the edges.
    * Guaranteed to be called prior to any other function.
@@ -51,6 +56,21 @@ public abstract class EdgeReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
+   * Set aggregator usage. It provides the functionality
+   * of aggregation operation in reading an edge.
+   * It is invoked just after initialization.
+   * E.g.,
+   * edgeReader.initialize(inputSplit, context);
+   * edgeReader.setAggregator(aggregatorUsage);
+   * This method is only for use by the infrastructure.
+   *
+   * @param agg aggregator usage for edge reader
+   */
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    workerAggregatorUsage = agg;
+  }
+
+  /**
    * Read the next edge.
    *
    * @return false iff there are no more edges
@@ -97,4 +117,14 @@ public abstract class EdgeReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.<A>getAggregatedValue(name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index b8b82af..94a4083 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -38,7 +39,11 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public abstract class VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+    DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements WorkerAggregatorUsage {
+  /** Aggregator usage for vertex reader */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -53,6 +58,21 @@ public abstract class VertexReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
+   * Set aggregator usage. It provides the functionality
+   * of aggregation operation in reading a vertex.
+   * It is invoked just after initialization.
+   * E.g.,
+   * vertexReader.initialize(inputSplit, context);
+   * vertexReader.setAggregator(aggregatorUsage);
+   * This method is only for use by the infrastructure.
+   *
+   * @param agg aggregator usage for vertex reader
+   */
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    workerAggregatorUsage = agg;
+  }
+
+  /**
    *
    * @return false iff there are no more vertices
    * @throws IOException
@@ -88,4 +108,14 @@ public abstract class VertexReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.<A>getAggregatedValue(name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index aae7a72..e3b3689 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.internal;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -70,6 +71,12 @@ public class WrappedEdgeReader<I extends WritableComparable,
   }
 
   @Override
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    // Set aggregator usage for edge reader
+    baseEdgeReader.setWorkerAggregatorUse(agg);
+  }
+
+  @Override
   public boolean nextEdge() throws IOException, InterruptedException {
     return baseEdgeReader.nextEdge();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 54adfec..bf0a212 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.internal;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -71,6 +72,12 @@ public class WrappedVertexReader<I extends WritableComparable,
   }
 
   @Override
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    // Set aggregator usage for vertex reader
+    baseVertexReader.setWorkerAggregatorUse(agg);
+  }
+
+  @Override
   public boolean nextVertex() throws IOException, InterruptedException {
     return baseVertexReader.nextVertex();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index bd48116..0d266a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1425,6 +1425,58 @@ public class BspServiceMaster<I extends WritableComparable,
     }
   }
 
+  /**
+   * Initialize aggregator at the master side
+   * before vertex/edge loading.
+   * This methods cooperates with other code
+   * to enables aggregation usage at INPUT_SUPERSTEP
+   * Other codes are:
+   *  BSPServiceWorker:
+   *  aggregatorHandler.prepareSuperstep in
+   *  setup
+   *  set aggregator usage in vertexReader and
+   *  edgeReader
+   *
+   * @throws InterruptedException
+   */
+  private void initializeAggregatorInputSuperstep()
+    throws InterruptedException {
+    aggregatorHandler.prepareSuperstep(masterClient);
+    prepareMasterCompute(getSuperstep());
+    try {
+      masterCompute.initialize();
+    } catch (InstantiationException e) {
+      LOG.fatal(
+        "initializeAggregatorInputSuperstep: Failed in instantiation", e);
+      throw new RuntimeException(
+        "initializeAggregatorInputSuperstep: Failed in instantiation", e);
+    } catch (IllegalAccessException e) {
+      LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
+      throw new RuntimeException(
+        "initializeAggregatorInputSuperstep: Failed in access", e);
+    }
+    aggregatorHandler.finishSuperstep(masterClient);
+  }
+
+  /**
+   * This is required before initialization
+   * and run of MasterCompute
+   *
+   * @param superstep superstep for which to run masterCompute
+   * @return Superstep classes set by masterCompute
+   */
+  private SuperstepClasses prepareMasterCompute(long superstep) {
+    GraphState graphState = new GraphState(superstep ,
+        GiraphStats.getInstance().getVertices().getValue(),
+        GiraphStats.getInstance().getEdges().getValue(),
+        getContext());
+    SuperstepClasses superstepClasses =
+      new SuperstepClasses(getConfiguration());
+    masterCompute.setGraphState(graphState);
+    masterCompute.setSuperstepClasses(superstepClasses);
+    return superstepClasses;
+  }
+
   @Override
   public SuperstepState coordinateSuperstep() throws
   KeeperException, InterruptedException {
@@ -1495,6 +1547,9 @@ public class BspServiceMaster<I extends WritableComparable,
     }
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
+      // Initialize aggregators before coordinating
+      // vertex loading and edge loading
+      initializeAggregatorInputSuperstep();
       if (getConfiguration().hasVertexInputFormat()) {
         coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
             "Vertex");
@@ -1516,7 +1571,9 @@ public class BspServiceMaster<I extends WritableComparable,
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
     aggregatorHandler.prepareSuperstep(masterClient);
-    SuperstepClasses superstepClasses = runMasterCompute(getSuperstep());
+    SuperstepClasses superstepClasses =
+      prepareMasterCompute(getSuperstep() + 1);
+    doMasterCompute();
 
     // If the master is halted or all the vertices voted to halt and there
     // are no more messages in the system, stop the computation
@@ -1569,39 +1626,13 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   /**
-   * Run the master.compute() class
-   *
-   * @param superstep superstep for which to run the master.compute()
-   * @return Superstep classes set by Master compute
+   * This doMasterCompute is only called
+   * after masterCompute is initialized
    */
-  private SuperstepClasses runMasterCompute(long superstep) {
-    // The master.compute() should run logically before the workers, so
-    // increase the superstep counter it uses by one
-    GraphState graphState = new GraphState(superstep + 1,
-        GiraphStats.getInstance().getVertices().getValue(),
-        GiraphStats.getInstance().getEdges().getValue(),
-        getContext());
-    SuperstepClasses superstepClasses =
-        new SuperstepClasses(getConfiguration());
-    masterCompute.setGraphState(graphState);
-    masterCompute.setSuperstepClasses(superstepClasses);
-    if (superstep == INPUT_SUPERSTEP) {
-      try {
-        masterCompute.initialize();
-      } catch (InstantiationException e) {
-        LOG.fatal("runMasterCompute: Failed in instantiation", e);
-        throw new RuntimeException(
-            "runMasterCompute: Failed in instantiation", e);
-      } catch (IllegalAccessException e) {
-        LOG.fatal("runMasterCompute: Failed in access", e);
-        throw new RuntimeException(
-            "runMasterCompute: Failed in access", e);
-      }
-    }
+  private void doMasterCompute() {
     GiraphTimerContext timerContext = masterComputeTimer.time();
     masterCompute.compute();
     timerContext.stop();
-    return superstepClasses;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index eb6d30d..342e2b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -480,11 +480,15 @@ public class BspServiceWorker<I extends WritableComparable,
     workerGraphPartitioner.updatePartitionOwners(
         getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 
-/*if[HADOOP_NON_SECURE]
-    workerClient.setup();
-else[HADOOP_NON_SECURE]*/
+    /*if[HADOOP_NON_SECURE]
+      workerClient.setup();
+    else[HADOOP_NON_SECURE]*/
     workerClient.setup(getConfiguration().authenticate());
-/*end[HADOOP_NON_SECURE]*/
+    /*end[HADOOP_NON_SECURE]*/
+
+    // Initialize aggregator at worker side during setup.
+    // Do this just before vertex and edge loading.
+    aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 
     VertexEdgeCount vertexEdgeCount;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 78cdd8e..c2c72c6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -59,6 +59,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
       EdgeInputSplitsCallable.class);
+
+  /** Aggregator handler */
+  private final WorkerThreadAggregatorUsage aggregatorUsage;
   /** Edge input format */
   private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Input split max edges (-1 denotes all) */
@@ -95,6 +98,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     this.edgeInputFormat = edgeInputFormat;
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+    // Initialize aggregator usage.
+    this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
+      .newThreadAggregatorUsage();
     edgeInputFilter = configuration.getEdgeInputFilter();
 
     // Initialize Metrics
@@ -125,7 +131,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     edgeReader.setConf(
         (ImmutableClassesGiraphConfiguration<I, Writable, E>)
             configuration);
+
     edgeReader.initialize(inputSplit, context);
+    // Set aggregator usage to edge reader
+    edgeReader.setWorkerAggregatorUse(aggregatorUsage);
 
     long inputSplitEdgesLoaded = 0;
     long inputSplitEdgesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 977e100..fb4fdf4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -129,7 +129,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     VertexReader<I, V, E> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(configuration);
+
+    WorkerThreadAggregatorUsage aggregatorUsage =
+      this.bspServiceWorker
+        .getAggregatorHandler().newThreadAggregatorUsage();
+
     vertexReader.initialize(inputSplit, context);
+    // Set aggregator usage to vertex reader
+    vertexReader.setWorkerAggregatorUse(aggregatorUsage);
 
     long inputSplitVerticesLoaded = 0;
     long inputSplitVerticesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
index db527f2..b054e9e 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -19,14 +19,29 @@
 package org.apache.giraph.examples;
 
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /** Computation which uses aggrergators. To be used for testing. */
 public class AggregatorsTestComputation extends
@@ -37,6 +52,12 @@ public class AggregatorsTestComputation extends
   private static final String REGULAR_AGG = "regular";
   /** Name of persistent aggregator */
   private static final String PERSISTENT_AGG = "persistent";
+  /** Name of input super step persistent aggregator */
+  private static final String INPUT_VERTEX_PERSISTENT_AGG
+    = "input_super_step_vertex_agg";
+  /** Name of input super step persistent aggregator */
+  private static final String INPUT_EDGE_PERSISTENT_AGG
+    = "input_super_step_edge_agg";
   /** Name of master overwriting aggregator */
   private static final String MASTER_WRITE_AGG = "master";
   /** Value which master compute will use */
@@ -92,6 +113,14 @@ public class AggregatorsTestComputation extends
       setAggregatedValue(MASTER_WRITE_AGG, myValue);
 
       long nv = getTotalNumVertices();
+      if (superstep >= 0) {
+        assertEquals(100, ((LongWritable)
+          getAggregatedValue(INPUT_VERTEX_PERSISTENT_AGG)).get());
+      }
+      if (superstep >= 0) {
+        assertEquals(4500, ((LongWritable)
+          getAggregatedValue(INPUT_EDGE_PERSISTENT_AGG)).get());
+      }
       if (superstep > 0) {
         assertEquals(nv * (1L << (superstep - 1)),
             ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
@@ -111,6 +140,10 @@ public class AggregatorsTestComputation extends
     @Override
     public void initialize() throws InstantiationException,
         IllegalAccessException {
+      registerPersistentAggregator(
+          INPUT_VERTEX_PERSISTENT_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(
+          INPUT_EDGE_PERSISTENT_AGG, LongSumAggregator.class);
       registerAggregator(REGULAR_AGG, LongSumAggregator.class);
       registerPersistentAggregator(PERSISTENT_AGG,
           LongSumAggregator.class);
@@ -134,4 +167,125 @@ public class AggregatorsTestComputation extends
           ", actual: " + actual);
     }
   }
+
+  /**
+   * Simple VertexReader
+   */
+  public static class SimpleVertexReader extends
+      GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimpleVertexReader.class);
+
+    @Override
+    public boolean nextVertex() {
+      return totalRecords > recordsRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable,
+        FloatWritable> getCurrentVertex() throws IOException {
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
+          getConf().createVertex();
+      LongWritable vertexId = new LongWritable(
+          (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+      long targetVertexId =
+          (vertexId.get() + 1) %
+          (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+      edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+          new FloatWritable(edgeValue)));
+      vertex.initialize(vertexId, vertexValue, edges);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next vertex: Return vertexId=" + vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+      }
+      aggregate(INPUT_VERTEX_PERSISTENT_AGG,
+        new LongWritable((long) vertex.getValue().get()));
+      return vertex;
+    }
+  }
+
+  /**
+   * Simple VertexInputFormat
+   */
+  public static class SimpleVertexInputFormat extends
+    GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
+    @Override
+    public VertexReader<LongWritable, DoubleWritable,
+    FloatWritable> createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+      throws IOException {
+      return new SimpleVertexReader();
+    }
+  }
+
+  /**
+   * Simple Edge Reader
+   */
+  public static class SimpleEdgeReader extends
+    GeneratedEdgeReader<LongWritable, FloatWritable> {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(SimpleEdgeReader.class);
+
+    @Override
+    public boolean nextEdge() {
+      return totalRecords > recordsRead;
+    }
+
+    @Override
+    public Edge<LongWritable, FloatWritable> getCurrentEdge()
+      throws IOException {
+      LongWritable vertexId = new LongWritable(
+        (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      long targetVertexId = (vertexId.get() + 1) %
+        (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      Edge<LongWritable, FloatWritable> edge = EdgeFactory.create(
+        new LongWritable(targetVertexId), new FloatWritable(edgeValue));
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next edge: Return targetVertexId=" + targetVertexId +
+          ", edgeValue=" + edgeValue);
+      }
+      aggregate(INPUT_EDGE_PERSISTENT_AGG, new LongWritable((long) edge
+        .getValue().get()));
+      return edge;
+    }
+
+    @Override
+    public LongWritable getCurrentSourceId() throws IOException,
+      InterruptedException {
+      LongWritable vertexId = new LongWritable(
+        (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      return vertexId;
+    }
+  }
+
+  /**
+   * Simple VertexInputFormat
+   */
+  public static class SimpleEdgeInputFormat extends
+    EdgeInputFormat<LongWritable, FloatWritable> {
+
+    @Override
+    public EdgeReader<LongWritable, FloatWritable> createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+      return new SimpleEdgeReader();
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+      throws IOException, InterruptedException {
+      List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+      for (int i = 0; i < minSplitCountHint; ++i) {
+        inputSplitList.add(new BspInputSplit(i, minSplitCountHint));
+      }
+      return inputSplitList;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
new file mode 100644
index 0000000..14da5f2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Used by GeneratedEdgeInputFormat
+ * to read some generated data
+ *
+ * @param <I> Vertex index value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedEdgeReader<
+    I extends WritableComparable,
+    E extends Writable>
+    extends EdgeReader<I, E> {
+  /** Default edges produced by this reader */
+  public static final LongConfOption DEFAULT_READER_EDGES =
+    new LongConfOption("GeneratedEdgeReader.reader_edges", 10);
+  /** Records read so far */
+  protected long recordsRead = 0;
+  /** Total records to read (on this split alone) */
+  protected long totalRecords = 0;
+  /** The input split from initialize(). */
+  protected BspInputSplit inputSplit = null;
+
+  /**
+   * Default constructor for reflection.
+   */
+  public GeneratedEdgeReader() {
+  }
+
+  @Override
+  public final void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    totalRecords = DEFAULT_READER_EDGES.get(getConf());
+    this.inputSplit = (BspInputSplit) inputSplit;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public final float getProgress() throws IOException {
+    return recordsRead * 100.0f / totalRecords;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 6d22800..e2b611b 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -25,7 +25,6 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.examples.AggregatorsTestComputation;
 import org.apache.giraph.examples.SimpleCheckpoint;
-import org.apache.giraph.examples.SimplePageRankComputation;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.hadoop.fs.Path;
@@ -77,7 +76,9 @@ public class TestAggregatorsHandling extends BspCase {
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(AggregatorsTestComputation.class);
     conf.setVertexInputFormatClass(
-        SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+        AggregatorsTestComputation.SimpleVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(
+        AggregatorsTestComputation.SimpleEdgeInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     job.getConfiguration().setMasterComputeClass(
         AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
@@ -159,7 +160,9 @@ public class TestAggregatorsHandling extends BspCase {
     conf.setMasterComputeClass(
         AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
     conf.setVertexInputFormatClass(
-        SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+        AggregatorsTestComputation.SimpleVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(
+        AggregatorsTestComputation.SimpleEdgeInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
 
     GiraphConfiguration configuration = job.getConfiguration();
@@ -178,7 +181,9 @@ public class TestAggregatorsHandling extends BspCase {
     conf.setMasterComputeClass(
         AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
     conf.setVertexInputFormatClass(
-        SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+        AggregatorsTestComputation.SimpleVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(
+        AggregatorsTestComputation.SimpleEdgeInputFormat.class);
     GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
         conf, outputPath);
     job.getConfiguration().setMasterComputeClass(