You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/07 23:09:36 UTC

svn commit: r1198972 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: aching
Date: Mon Nov  7 22:09:35 2011
New Revision: 1198972

URL: http://svn.apache.org/viewvc?rev=1198972&view=rev
Log:
GIRAPH-47: Export Worker's Context/State to vertices through
pre/post/Application/Superstep. (cmartella via aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/CODE_CONVENTIONS
    incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Nov  7 22:09:35 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-47: Export Worker's Context/State to vertices through
+  pre/post/Application/Superstep. (cmartella via aching)
+
   GIRAPH-71: SequenceFileVertexInputFormat missing license header; 
   rat fails. (jghoman)
 

Modified: incubator/giraph/trunk/CODE_CONVENTIONS
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CODE_CONVENTIONS?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/CODE_CONVENTIONS (original)
+++ incubator/giraph/trunk/CODE_CONVENTIONS Mon Nov  7 22:09:35 2011
@@ -49,7 +49,9 @@ if (LOG.isInfoEnabled()) {
 }
 
 - All classes, members, and member methods should have Javadoc in the following
-  style.  C-style comments for javadoc and // comments for non-javadoc.
+  style.  C-style comments for javadoc and // comments for non-javadoc.  Also, the comment 
+  block should have a line break that separates the comment section and the @ section.  
+  See below.
 
 /**
  * This is an example class
@@ -76,6 +78,11 @@ public class Giraffe {
   }
 }
 
-- Class members should not begin with 'm_' or '_'.
+- Class members should not begin with 'm_' or '_'
 - No warnings allowed, but be as specific as possible with warning suppression
-- Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
\ No newline at end of file
+- Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
+- Static variable names should be entirely capitalized and seperated by '_' 
+  (i.e. private static int FOO_BAR_BAR = 2)
+- Non-static variable and method names should not begin capitalized and should only use
+  alphanumeric characters (i.e. int fooBarBar)
+- All classnames begin capitalized then use lower casing (i.e. class FooBarBar)
\ No newline at end of file

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Mon Nov  7 22:09:35 2011
@@ -46,19 +46,6 @@ public class PageRankBenchmark extends
     public static String SUPERSTEP_COUNT = "PageRankBenchmark.superstepCount";
 
     @Override
-    public void preApplication()
-        throws InstantiationException, IllegalAccessException {
-    }
-
-    @Override
-    public void postApplication() {
-    }
-
-    @Override
-    public void preSuperstep() {
-    }
-
-    @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
         if (getSuperstep() >= 1) {
             double sum = 0;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Mon Nov  7 22:09:35 2011
@@ -45,38 +45,28 @@ public class RandomMessageBenchmark exte
     private Configuration conf;
 
     /** How many supersteps to run */
-    public static String SUPERSTEP_COUNT = "RandomMessageBenchmark.superstepCount";
-    
+    public static String SUPERSTEP_COUNT =
+        "RandomMessageBenchmark.superstepCount";
+
     /** How many bytes per message */
-    public static String NUM_BYTES_PER_MESSAGE = "RandomMessageBenchmark.numBytesPerMessage";
+    public static String NUM_BYTES_PER_MESSAGE =
+        "RandomMessageBenchmark.numBytesPerMessage";
     /** How many bytes per message */
-    public static String NUM_MESSAGES_PER_VERTEX = "RandomMessageBenchmark.numMessagesPerVertex";
-    
+    public static String NUM_MESSAGES_PER_VERTEX =
+        "RandomMessageBenchmark.numMessagesPerVertex";
+
     /** Random generator for random bytes message */
     private Random rnd = new Random(System.currentTimeMillis());
 
     @Override
-    public void preApplication()
-        throws InstantiationException, IllegalAccessException {
-    }
-
-    @Override
-    public void postApplication() {
-    }
-
-    @Override
-    public void preSuperstep() {
-    }
-
-    @Override
     public void compute(Iterator<BytesWritable> msgIterator) {
         byte [] message = new byte[getConf().getInt(NUM_BYTES_PER_MESSAGE, 16)];
         int numMessage = getConf().getInt(NUM_MESSAGES_PER_VERTEX, 1);
         if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
             for (int i=0; i < numMessage; i++) {
-                rnd.nextBytes(message);         
+                rnd.nextBytes(message);
                 sendMsgToAllEdges(new BytesWritable(message));
-            }                     
+            }
         } else {
             voteToHalt();
         }
@@ -125,7 +115,7 @@ public class RandomMessageBenchmark exte
                 "flusher",
                 true,
                 "Number of flush threads");
-        
+
         HelpFormatter formatter = new HelpFormatter();
         if (args.length == 0) {
             formatter.printHelp(getClass().getName(), options, true);
@@ -178,11 +168,11 @@ public class RandomMessageBenchmark exte
             SUPERSTEP_COUNT,
             Integer.parseInt(cmd.getOptionValue('s')));
         job.getConfiguration().setInt(
-                RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
-                Integer.parseInt(cmd.getOptionValue('b')));
+            RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+            Integer.parseInt(cmd.getOptionValue('b')));
         job.getConfiguration().setInt(
-                RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
-                Integer.parseInt(cmd.getOptionValue('n')));
+            RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
+            Integer.parseInt(cmd.getOptionValue('n')));
 
         boolean isVerbose = false;
         if (cmd.hasOption('v')) {
@@ -193,8 +183,8 @@ public class RandomMessageBenchmark exte
                              Integer.parseInt(cmd.getOptionValue('s')));
         }
         if (cmd.hasOption('f')) {
-            job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS, 
-                Integer.parseInt(cmd.getOptionValue('f')));         
+            job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+                Integer.parseInt(cmd.getOptionValue('f')));
         }
         if (job.run(isVerbose) == true) {
             return 0;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Mon Nov  7 22:09:35 2011
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.bsp;
 
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -39,14 +38,6 @@ public interface CentralizedService<I ex
     void setup();
 
     /**
-     * Get the representative Vertex for this worker.  It can used to
-     * call pre/post application/superstep methods defined by the user.
-     *
-     * @return representation vertex
-     */
-    BasicVertex<I, V, E, M> getRepresentativeVertex();
-
-    /**
      * Get the current global superstep of the application to work on.
      *
      * @return global superstep (begins at INPUT_SUPERSTEP)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Mon Nov  7 22:09:35 2011
@@ -27,6 +27,7 @@ import org.apache.giraph.graph.Aggregato
 import org.apache.giraph.graph.GraphMapper;
 import org.apache.giraph.graph.VertexRange;
 import org.apache.giraph.graph.BasicVertexRangeBalancer;
+import org.apache.giraph.graph.WorkerContext;
 
 /**
  * All workers should have access to this centralized service to
@@ -53,6 +54,12 @@ public interface CentralizedServiceWorke
      */
     int getPort();
 
+   /**
+    * 
+    * @return worker's WorkerContext
+    */
+    WorkerContext getWorkerContext();
+
     /**
      * Get a synchronized map to the partitions and their sorted vertex lists.
      * This could be used to run compute for the vertices or checkpointing.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Mon Nov  7 22:09:35 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
 
 import java.util.Iterator;
 
@@ -39,21 +40,14 @@ import java.util.Iterator;
 public class SimpleCheckpointVertex extends
         Vertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
         implements Tool {
+    private static Logger LOG =
+        Logger.getLogger(SimpleCheckpointVertex.class);
     /** Configuration */
     private Configuration conf;
-    /** User can access this after the application finishes if local */
-    public static long finalSum;
-    /** Number of supersteps to run (6 by default) */
-    private static int supersteps = 6;
-    /** Filename to indicate whether a fault was found */
-    public final String faultFile = "/tmp/faultFile";
     /** Which superstep to cause the worker to fail */
     public final int faultingSuperstep = 4;
     /** Vertex id to fault on */
     public final long faultingVertexId = 1;
-    /** Enable the fault at the particular vertex id and superstep? */
-    private static boolean enableFault = false;
-
     /** Dynamically set number of supersteps */
     public static final String SUPERSTEP_COUNT =
         "simpleCheckpointVertex.superstepCount";
@@ -62,32 +56,16 @@ public class SimpleCheckpointVertex exte
         "simpleCheckpointVertex.enableFault";
 
     @Override
-    public void preApplication() throws InstantiationException, IllegalAccessException {
-        registerAggregator(LongSumAggregator.class.getName(),
-                           LongSumAggregator.class);
-        LongSumAggregator sumAggregator = (LongSumAggregator)
-            getAggregator(LongSumAggregator.class.getName());
-        sumAggregator.setAggregatedValue(new LongWritable(0));
-        supersteps = getConf().getInt(SUPERSTEP_COUNT, supersteps);
-        enableFault = getConf().getBoolean(ENABLE_FAULT, false);
-    }
-
-    @Override
-    public void postApplication() {
-        LongSumAggregator sumAggregator = (LongSumAggregator)
-            getAggregator(LongSumAggregator.class.getName());
-        finalSum = sumAggregator.getAggregatedValue().get();
-    }
-
-    @Override
-    public void preSuperstep() {
-        useAggregator(LongSumAggregator.class.getName());
-    }
-
-    @Override
     public void compute(Iterator<FloatWritable> msgIterator) {
+    	SimpleCheckpointVertexWorkerContext workerContext = 
+    		(SimpleCheckpointVertexWorkerContext) getWorkerContext();
+    	
         LongSumAggregator sumAggregator = (LongSumAggregator)
             getAggregator(LongSumAggregator.class.getName());
+        
+        boolean enableFault = workerContext.getEnableFault();
+        int supersteps = workerContext.getSupersteps();
+        
         if (enableFault && (getSuperstep() == faultingSuperstep) &&
                 (getContext().getTaskAttemptID().getId() == 0) &&
                 (getVertexId().get() == faultingVertexId)) {
@@ -133,6 +111,56 @@ public class SimpleCheckpointVertex exte
             sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
         }
     }
+    
+    public static class SimpleCheckpointVertexWorkerContext 
+            extends WorkerContext {
+        /** User can access this after the application finishes if local */
+        public static long finalSum;
+        /** Number of supersteps to run (6 by default) */
+        private int supersteps = 6;
+        /** Filename to indicate whether a fault was found */
+        public final String faultFile = "/tmp/faultFile";
+        /** Enable the fault at the particular vertex id and superstep? */
+        private boolean enableFault = false;
+
+		@Override
+		public void preApplication() 
+		        throws InstantiationException, IllegalAccessException {
+		    registerAggregator(LongSumAggregator.class.getName(),
+					LongSumAggregator.class);
+		    LongSumAggregator sumAggregator = (LongSumAggregator)
+		    getAggregator(LongSumAggregator.class.getName());
+		    sumAggregator.setAggregatedValue(new LongWritable(0));
+		    supersteps = getContext().getConfiguration()
+		        .getInt(SUPERSTEP_COUNT, supersteps);
+		    enableFault = getContext().getConfiguration()
+		        .getBoolean(ENABLE_FAULT, false);
+		}
+
+		@Override
+		public void postApplication() {
+		    LongSumAggregator sumAggregator = (LongSumAggregator)
+		        getAggregator(LongSumAggregator.class.getName());
+		    finalSum = sumAggregator.getAggregatedValue().get();
+		    LOG.info("finalSum="+ finalSum);
+		}
+
+		@Override
+		public void preSuperstep() {
+	        useAggregator(LongSumAggregator.class.getName());
+		}
+
+		@Override
+		public void postSuperstep() { }
+		
+		public int getSupersteps() {
+		    return this.supersteps;
+		}
+
+		public boolean getEnableFault() {
+		    return this.enableFault;
+		}
+    }
 
     @Override
     public int run(String[] args) throws Exception {
@@ -175,18 +203,15 @@ public class SimpleCheckpointVertex exte
             return -1;
         }
 
-        getConf().setClass(GiraphJob.VERTEX_CLASS, getClass(), Vertex.class);
-        getConf().setClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
-                           GeneratedVertexInputFormat.class,
-                           VertexInputFormat.class);
-        getConf().setClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
-                           SimpleTextVertexOutputFormat.class,
-                           VertexOutputFormat.class);
-        getConf().setInt(GiraphJob.MIN_WORKERS,
-                         Integer.parseInt(cmd.getOptionValue('w')));
-        getConf().setInt(GiraphJob.MAX_WORKERS,
-                         Integer.parseInt(cmd.getOptionValue('w')));
         GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+        bspJob.setVertexClass(getClass());
+        bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
+        bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
+        bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+        int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+        int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+        bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+        
         FileOutputFormat.setOutputPath(bspJob,
                                        new Path(cmd.getOptionValue('o')));
         boolean verbose = false;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Mon Nov  7 22:09:35 2011
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.WorkerContext;
 
 /**
  * Vertex to allow unit testing of graph mutations.
@@ -36,19 +37,11 @@ import org.apache.giraph.graph.MutableVe
 public class SimpleMutateGraphVertex extends
         Vertex<LongWritable, DoubleWritable,
         FloatWritable, DoubleWritable> {
-    private static int edgesRemoved = 0;
-
+    /** Maximum number of ranges for vertex ids */
+    private long maxRanges = 100;
     /** Class logger */
     private static Logger LOG =
         Logger.getLogger(SimpleMutateGraphVertex.class);
-    /** Cached vertex count */
-    private static long VERTEX_COUNT;
-    /** Cached edge count */
-    private static long EDGE_COUNT;
-    /** Original cached edge count */
-    private static long ORIG_EDGE_COUNT;
-    /** Maximum number of ranges for vertex ids */
-    private static long MAX_RANGES = 100;
 
     /**
      * Unless we create a ridiculous number of vertices , we should not
@@ -57,12 +50,16 @@ public class SimpleMutateGraphVertex ext
      * @return Starting vertex id of the range
      */
     private long rangeVertexIdStart(int range) {
-        return (Long.MAX_VALUE / MAX_RANGES) * range;
+        return (Long.MAX_VALUE / maxRanges) * range;
     }
 
     @Override
     public void compute(Iterator<DoubleWritable> msgIterator) throws IOException {
-        if (getSuperstep() == 1) {
+
+    	SimpleMutateGraphVertexWorkerContext workerContext =
+    		(SimpleMutateGraphVertexWorkerContext) getWorkerContext();
+
+    	if (getSuperstep() == 1) {
             // Send messages to vertices that are sure not to exist
             // (creating them)
             LongWritable destVertexId =
@@ -70,16 +67,18 @@ public class SimpleMutateGraphVertex ext
             sendMsg(destVertexId, new DoubleWritable(0.0));
         } else if (getSuperstep() == 2) {
         } else if (getSuperstep() == 3) {
-            if (VERTEX_COUNT * 2 != getNumVertices()) {
+        	long vertex_count = workerContext.getVertexCount();
+            if (vertex_count * 2 != getNumVertices()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumVertices() +
-                    " vertices when should have " + VERTEX_COUNT * 2 +
+                    " vertices when should have " + vertex_count * 2 +
                     " on superstep " + getSuperstep());
             }
-            if (EDGE_COUNT != getNumEdges()) {
+            long edge_count = workerContext.getEdgeCount();
+            if (edge_count != getNumEdges()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumEdges() +
-                    " edges when should have " + EDGE_COUNT +
+                    " edges when should have " + edge_count +
                     " on superstep " + getSuperstep());
             }
             // Create vertices that are sure not to exist (doubling vertices)
@@ -95,22 +94,24 @@ public class SimpleMutateGraphVertex ext
                                getVertexId(), new FloatWritable(0.0f)));
         } else if (getSuperstep() == 4) {
         } else if (getSuperstep() == 5) {
-            if (VERTEX_COUNT * 2 != getNumVertices()) {
+        	long vertex_count = workerContext.getVertexCount();
+            if (vertex_count * 2 != getNumVertices()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumVertices() +
-                    " when should have " + VERTEX_COUNT * 2 +
+                    " when should have " + vertex_count * 2 +
                     " on superstep " + getSuperstep());
             }
-            if (EDGE_COUNT + VERTEX_COUNT != getNumEdges()) {
+            long edge_count = workerContext.getEdgeCount();
+            if (edge_count + vertex_count != getNumEdges()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumEdges() +
-                    " edges when should have " + EDGE_COUNT + VERTEX_COUNT +
+                    " edges when should have " + edge_count + vertex_count +
                     " on superstep " + getSuperstep());
             }
             // Remove the edges created in superstep 3
             LongWritable vertexIndex =
                 new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
-            ++edgesRemoved;
+            workerContext.increaseEdgesRemoved();
             removeEdgeRequest(vertexIndex, getVertexId());
         } else if (getSuperstep() == 6) {
             // Remove all the vertices created in superstep 3
@@ -119,17 +120,19 @@ public class SimpleMutateGraphVertex ext
                 removeVertexRequest(getVertexId());
             }
         } else if (getSuperstep() == 7) {
-            if (ORIG_EDGE_COUNT != getNumEdges()) {
+        	long orig_edge_count = workerContext.getOrigEdgeCount();
+            if (orig_edge_count != getNumEdges()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumEdges() +
-                    " edges when should have " + ORIG_EDGE_COUNT +
+                    " edges when should have " + orig_edge_count +
                     " on superstep " + getSuperstep());
             }
         } else if (getSuperstep() == 8) {
-            if (VERTEX_COUNT / 2 != getNumVertices()) {
+        	long vertex_count = workerContext.getVertexCount();
+            if (vertex_count / 2 != getNumVertices()) {
                 throw new IllegalStateException(
                     "Impossible to have " + getNumVertices() +
-                    " vertices when should have " + VERTEX_COUNT / 2 +
+                    " vertices when should have " + vertex_count / 2 +
                     " on superstep " + getSuperstep());
             }
         }
@@ -138,17 +141,54 @@ public class SimpleMutateGraphVertex ext
         }
     }
 
-    @Override
-    public void postSuperstep() {
-        VERTEX_COUNT = getNumVertices();
-        EDGE_COUNT = getNumEdges();
-        if (getSuperstep() == 1) {
-            ORIG_EDGE_COUNT = EDGE_COUNT;
-        }
-        LOG.info("Got " + VERTEX_COUNT + " vertices, " +
-                 EDGE_COUNT + " edges on superstep " +
-                 getSuperstep());
-        LOG.info("Removed " + edgesRemoved);
-        edgesRemoved = 0;
+    public static class SimpleMutateGraphVertexWorkerContext extends WorkerContext {
+        /** Cached vertex count */
+        private long vertexCount;
+        /** Cached edge count */
+        private long edgeCount;
+        /** Original number of edges */
+        private long origEdgeCount;
+        /** Number of edges removed during superstep */
+        private int edgesRemoved = 0;
+
+		@Override
+		public void preApplication()
+				throws InstantiationException, IllegalAccessException { }
+
+		@Override
+		public void postApplication() { }
+
+		@Override
+		public void preSuperstep() {
+			vertexCount = getNumVertices();
+			edgeCount = getNumEdges();
+			if (getSuperstep() == 1) {
+				origEdgeCount = edgeCount;
+			}
+			LOG.info("Got " + vertexCount + " vertices, " +
+					 edgeCount + " edges on superstep " +
+					 getSuperstep());
+			LOG.info("Removed " + edgesRemoved);
+			edgesRemoved = 0;
+		}
+
+		@Override
+		public void postSuperstep() { }
+
+		public long getVertexCount() {
+			return vertexCount;
+		}
+
+		public long getEdgeCount() {
+			return edgeCount;
+		}
+
+		public long getOrigEdgeCount() {
+			return origEdgeCount;
+		}
+
+		public void increaseEdgesRemoved() {
+			this.edgesRemoved++;
+		}
     }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Mon Nov  7 22:09:35 2011
@@ -19,11 +19,13 @@
 package org.apache.giraph.examples;
 
 import com.google.common.collect.Maps;
+
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.lib.TextVertexOutputFormat;
 import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.DoubleWritable;
@@ -43,63 +45,11 @@ import java.util.Map;
  * Demonstrates the basic Pregel PageRank implementation.
  */
 public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
-    /** User can access this sum after the application finishes if local */
-    public static long finalSum;
-    /** User can access this min after the application finishes if local */
-    public static double finalMin;
-    /** User can access this max after the application finishes if local */
-    public static double finalMax;
-    /** Logger */
+	/** Logger */
     private static final Logger LOG =
         Logger.getLogger(SimplePageRankVertex.class);
 
     @Override
-    public void preApplication()
-            throws InstantiationException, IllegalAccessException {
-        registerAggregator("sum", LongSumAggregator.class);
-        registerAggregator("min", MinAggregator.class);
-        registerAggregator("max", MaxAggregator.class);
-    }
-
-    @Override
-    public void postApplication() {
-        LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
-        MinAggregator minAggreg = (MinAggregator) getAggregator("min");
-        MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
-        finalSum = sumAggreg.getAggregatedValue().get();
-        finalMin = minAggreg.getAggregatedValue().get();
-        finalMax = maxAggreg.getAggregatedValue().get();
-
-    }
-
-    @Override
-    public void preSuperstep() {
-        LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
-        MinAggregator minAggreg = (MinAggregator) getAggregator("min");
-        MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
-        if (getSuperstep() >= 3) {
-            LOG.info("aggregatedNumVertices=" +
-                    sumAggreg.getAggregatedValue() +
-                    " NumVertices=" + getNumVertices());
-            if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
-                throw new RuntimeException("wrong value of SumAggreg: " +
-                        sumAggreg.getAggregatedValue() + ", should be: " +
-                        getNumVertices());
-            }
-            DoubleWritable maxPagerank =
-                    (DoubleWritable)maxAggreg.getAggregatedValue();
-            LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
-            DoubleWritable minPagerank =
-                    (DoubleWritable)minAggreg.getAggregatedValue();
-            LOG.info("aggregatedMinPageRank=" + minPagerank.get());
-        }
-        useAggregator("sum");
-        useAggregator("min");
-        useAggregator("max");
-        sumAggreg.setAggregatedValue(new LongWritable(0L));
-    }
-
-    @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
         LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
         MinAggregator minAggreg = (MinAggregator) getAggregator("min");
@@ -129,6 +79,76 @@ public class SimplePageRankVertex extend
         }
     }
 
+	public static class SimplePageRankVertexWorkerContext extends
+    		WorkerContext {
+
+    	public static double finalMax, finalMin;
+    	public static long finalSum;
+    	
+    	@Override
+    	public void preApplication() 
+    	throws InstantiationException, IllegalAccessException {
+    		
+    		registerAggregator("sum", LongSumAggregator.class);
+    		registerAggregator("min", MinAggregator.class);
+    		registerAggregator("max", MaxAggregator.class);			
+    	}
+
+    	@Override
+    	public void postApplication() {
+
+    		LongSumAggregator sumAggreg = 
+    			(LongSumAggregator) getAggregator("sum");
+    		MinAggregator minAggreg = 
+    			(MinAggregator) getAggregator("min");
+    		MaxAggregator maxAggreg = 
+    			(MaxAggregator) getAggregator("max");
+
+    		finalSum = sumAggreg.getAggregatedValue().get();
+    		finalMax = maxAggreg.getAggregatedValue().get();
+    		finalMin = minAggreg.getAggregatedValue().get();
+    		
+            LOG.info("aggregatedNumVertices=" + finalSum);
+            LOG.info("aggregatedMaxPageRank=" + finalMax);
+            LOG.info("aggregatedMinPageRank=" + finalMin);
+    	}
+
+		@Override
+		public void preSuperstep() {
+    		
+	        LongSumAggregator sumAggreg = 
+	        	(LongSumAggregator) getAggregator("sum");
+	        MinAggregator minAggreg = 
+	        	(MinAggregator) getAggregator("min");
+	        MaxAggregator maxAggreg = 
+	        	(MaxAggregator) getAggregator("max");
+	        
+	        if (getSuperstep() >= 3) {
+	            LOG.info("aggregatedNumVertices=" +
+	                    sumAggreg.getAggregatedValue() +
+	                    " NumVertices=" + getNumVertices());
+	            if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+	                throw new RuntimeException("wrong value of SumAggreg: " +
+	                        sumAggreg.getAggregatedValue() + ", should be: " +
+	                        getNumVertices());
+	            }
+	            DoubleWritable maxPagerank =
+	                    (DoubleWritable) maxAggreg.getAggregatedValue();
+	            LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+	            DoubleWritable minPagerank =
+	                    (DoubleWritable) minAggreg.getAggregatedValue();
+	            LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+	        }
+	        useAggregator("sum");
+	        useAggregator("min");
+	        useAggregator("max");
+	        sumAggreg.setAggregatedValue(new LongWritable(0L));
+		}
+
+		@Override
+		public void postSuperstep() { }
+    }
+    
     /**
      * Simple VertexReader that supports {@link SimplePageRankVertex}
      */

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Mon Nov  7 22:09:35 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Fully runnable example of how to 
+ * emit worker data to HDFS during a graph
+ * computation.
+ */
+public class SimpleVertexWithWorkerContext extends
+        Vertex<LongWritable, IntWritable, FloatWritable, DoubleWritable> 
+        implements Tool {
+
+    public static final String OUTPUTDIR = "svwwc.outputdir";
+    private static final int TESTLENGTH = 30;
+
+    @Override
+    public void compute(Iterator<DoubleWritable> msgIterator)
+            throws IOException {
+        
+        long superstep = getSuperstep();
+        
+        if (superstep < TESTLENGTH) {
+            EmitterWorkerContext emitter = 
+                    (EmitterWorkerContext) getWorkerContext();
+            emitter.emit("vertexId=" + getVertexId() + 
+                         " superstep=" + superstep + "\n");
+        } else {
+            voteToHalt();
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+	public static class EmitterWorkerContext extends WorkerContext {
+        
+        private static final String FILENAME = "emitter_";
+        private DataOutputStream out;
+
+        @Override
+        public void preApplication() {
+            Context context = getContext();
+            FileSystem fs;
+            
+            try {
+                fs = FileSystem.get(context.getConfiguration());
+
+                String p = context.getConfiguration()
+                    .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
+                if (p == null) {
+                    throw new IllegalArgumentException(
+                        SimpleVertexWithWorkerContext.OUTPUTDIR + 
+                        " undefined!");
+                }
+            
+                Path path = new Path(p);
+                if (!fs.exists(path)) {
+                    throw new IllegalArgumentException(path + 
+                            " doesn't exist");
+                }
+
+                Path outF = new Path(path, FILENAME + 
+                        context.getTaskAttemptID());
+                if (fs.exists(outF)) {
+                    throw new IllegalArgumentException(outF + 
+                            " aready exists");
+                }
+            
+                out = fs.create(outF);
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "can't initialize WorkerContext", e);
+            }
+        }
+
+        @Override
+        public void postApplication() {
+            if (out != null) {
+                try {
+                    out.flush();
+                    out.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(
+                            "can't finalize WorkerContext", e);
+                }
+                out = null;
+            }
+        }
+
+        @Override
+        public void preSuperstep() { }
+
+        @Override
+        public void postSuperstep() { }
+        
+        public void emit(String s) {
+            try {
+                out.writeUTF(s);
+            } catch (IOException e) {
+                throw new RuntimeException("can't emit", e);
+            }
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        if (args.length != 2) {
+            throw new IllegalArgumentException(
+                "run: Must have 2 arguments <output path> <# of workers>");
+        }
+        GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+        job.setVertexClass(getClass());
+        job.setVertexInputFormatClass(
+            SimpleSuperstepVertexInputFormat.class);
+        job.setWorkerContextClass(EmitterWorkerContext.class);
+        Configuration conf = job.getConfiguration();
+        conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+        job.setWorkerConfiguration(Integer.parseInt(args[1]),
+                                   Integer.parseInt(args[1]),
+                                   100.0f);
+        if (job.run(true) == true) {
+            return 0;
+        } else {
+            return -1;
+        }
+    }
+    
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+    }
+}
\ No newline at end of file

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Mon Nov  7 22:09:35 2011
@@ -49,34 +49,6 @@ public abstract class BasicVertex<I exte
     public abstract void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
 
     /**
-     * Optionally defined by the user to be executed once on all workers
-     * before application has started.
-     *
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    public abstract void preApplication()
-        throws InstantiationException, IllegalAccessException;
-
-    /**
-     * Optionally defined by the user to be executed once on all workers
-     * after the application has completed.
-     */
-    public abstract void postApplication();
-
-    /**
-     * Optionally defined by the user to be executed once prior to vertex
-     * processing on a worker for the current superstep.
-     */
-    public abstract void preSuperstep();
-
-    /**
-     * Optionally defined by the user to be executed once after all vertex
-     * processing on a worker for the current superstep.
-     */
-    public abstract void postSuperstep();
-
-    /**
      * Must be defined by user to do computation on a single Vertex.
      *
      * @param msgIterator Iterator to the messages that were sent to this
@@ -233,6 +205,15 @@ public abstract class BasicVertex<I exte
     public Mapper.Context getContext() {
         return getGraphState().getContext();
     }
+    
+    /**
+     * Get the worker context
+     * 
+     * @return WorkerContext context
+     */
+    public WorkerContext getWorkerContext() {
+        return getGraphState().getGraphMapper().getWorkerContext();
+    }
 
     @Override
     public final <A extends Writable> Aggregator<A> registerAggregator(

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Mon Nov  7 22:09:35 2011
@@ -123,8 +123,6 @@ public abstract class BspService <
     private static final Logger LOG = Logger.getLogger(BspService.class);
     /** File system */
     private final FileSystem fs;
-    /** Used to call pre/post application/superstep methods */
-    private final BasicVertex<I, V, E, M> representativeVertex;
     /** Checkpoint frequency */
     private int checkpointFrequency = -1;
     /** Vertex range map based on the superstep below */
@@ -627,11 +625,6 @@ public abstract class BspService <
             throw new RuntimeException(e);
         }
         this.hostnamePartitionId = hostname + "_" + getTaskPartition();
-
-        this.representativeVertex =
-            BspUtils.<I, V, E, M>createVertex(
-                getConfiguration());
-        this.representativeVertex.setGraphState(getGraphMapper().getGraphState());
         this.checkpointFrequency =
             conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
                           GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
@@ -670,15 +663,6 @@ public abstract class BspService <
     }
 
     /**
-     * Get the representative vertex
-     *
-     * @return Representative vertex for this service.
-     */
-    final public BasicVertex<I, V, E, M> getRepresentativeVertex() {
-        return representativeVertex;
-    }
-
-    /**
      * Get the latest application attempt and cache it.
      *
      * @return the latest application attempt

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Nov  7 22:09:35 2011
@@ -81,6 +81,8 @@ public class BspServiceWorker<
     private final int finalRpcPort;
     /** List of aggregators currently in use */
     private Set<String> aggregatorInUse = new TreeSet<String>();
+    /** Worker Context */
+    private WorkerContext workerContext;
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
 
@@ -93,11 +95,17 @@ public class BspServiceWorker<
             getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
                           GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
                           getTaskPartition();
+        this.workerContext = BspUtils.createWorkerContext(getConfiguration(), 
+            graphMapper.getGraphState());
     }
 
     public int getPort() {
         return finalRpcPort;
     }
+    
+    public WorkerContext getWorkerContext() {
+    	return workerContext;
+    }
 
     /**
      * Intended to check the health of the node.  For instance, can it ssh,

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Mon Nov  7 22:09:35 2011
@@ -202,6 +202,42 @@ public class BspUtils {
         resolver.setGraphState(graphState);
         return resolver;
     }
+    
+   /**
+     * Get the user's subclassed WorkerContext.
+     *
+     * @param conf Configuration to check
+     * @return User's worker context class
+     */
+	public static Class<? extends WorkerContext>
+            getWorkerContextClass(Configuration conf) {
+        return (Class<? extends WorkerContext>)
+                conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+                              DefaultWorkerContext.class,
+                              WorkerContext.class);
+    }
+     
+   /**
+     * Create a user worker context
+     *
+     * @param conf Configuration to check
+     * @return Instantiated user worker context
+     */
+    @SuppressWarnings("rawtypes")
+	public static <I extends WritableComparable, 
+    			   V extends Writable,
+    			   E extends Writable, 
+    			   M extends Writable> 
+    		WorkerContext createWorkerContext(Configuration conf,
+            		GraphState<I, V, E, M> graphState) {
+        Class<? extends WorkerContext> workerContextClass =
+            getWorkerContextClass(conf);
+        WorkerContext workerContext =
+             ReflectionUtils.newInstance(workerContextClass, conf);
+        workerContext.setGraphState(graphState);
+        return workerContext;
+    }
+    
 
     /**
      * Get the user's subclassed Vertex.

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java Mon Nov  7 22:09:35 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * A dumb implementation of {@link WorkerContext}. This is the default
+ * implementation when no WorkerContext is defined by the user. It does
+ * nothing.
+ */
+public class DefaultWorkerContext extends WorkerContext {
+
+	@Override
+	public void preApplication() { }
+
+    @Override
+    public void postApplication() { }
+
+    @Override
+    public void preSuperstep() { }
+
+    @Override
+    public void postSuperstep() { }
+}
\ No newline at end of file

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Mon Nov  7 22:09:35 2011
@@ -58,6 +58,8 @@ public class GiraphJob extends Job {
     public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
     /** Message value class */
     public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+    /** Worker context class */
+    public static final String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
 
     /**
      * Minimum number of simultaneous workers before this job can run (int)
@@ -392,7 +394,19 @@ public class GiraphJob extends Job {
                                     vertexResolverClass,
                                     VertexResolver.class);
     }
-
+    
+   /**
+    * Set the worker context class (optional)
+    *
+    * @param workerContextClass Determines what code is executed on a each
+    *        worker before and after each superstep and computation
+    */
+    final public void setWorkerContextClass(Class<?> workerContextClass) {
+        getConfiguration().setClass(WORKER_CONTEXT_CLASS,
+                                    workerContextClass,
+                                    WorkerContext.class);
+    }
+    
     /**
      * Set worker configuration for determining what is required for
      * a superstep.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Mon Nov  7 22:09:35 2011
@@ -107,6 +107,10 @@ public class GraphMapper<I extends Writa
     public final AggregatorUsage getAggregatorUsage() {
         return serviceWorker;
     }
+    
+    public final WorkerContext getWorkerContext() {
+    	return serviceWorker.getWorkerContext();
+    }
 
     public final GraphState<I,V,E,M> getGraphState() {
       return graphState;
@@ -519,8 +523,7 @@ public class GraphMapper<I extends Writa
                   .setNumVertices(serviceWorker.getTotalVertices());
 
         try {
-            serviceWorker.getRepresentativeVertex().setGraphState(graphState);
-            serviceWorker.getRepresentativeVertex().preApplication();
+            serviceWorker.getWorkerContext().preApplication();
         } catch (InstantiationException e) {
             LOG.fatal("map: preApplication failed in instantiation", e);
             throw new RuntimeException(
@@ -588,7 +591,8 @@ public class GraphMapper<I extends Writa
             serviceWorker.exchangeVertexRanges();
             context.progress();
 
-            serviceWorker.getRepresentativeVertex().preSuperstep();
+            serviceWorker.getWorkerContext().setGraphState(graphState);
+            serviceWorker.getWorkerContext().preSuperstep();
             context.progress();
 
             workerFinishedVertices = 0;
@@ -631,7 +635,7 @@ public class GraphMapper<I extends Writa
                 }
             }
 
-            serviceWorker.getRepresentativeVertex().postSuperstep();
+            serviceWorker.getWorkerContext().postSuperstep();
             context.progress();
             if (LOG.isInfoEnabled()) {
                 LOG.info("map: totalMem="
@@ -649,7 +653,7 @@ public class GraphMapper<I extends Writa
                      "(global vertices marked done)");
         }
 
-        serviceWorker.getRepresentativeVertex().postApplication();
+        serviceWorker.getWorkerContext().postApplication();
         context.progress();
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Mon Nov  7 22:09:35 2011
@@ -67,28 +67,6 @@ public abstract class LongDoubleFloatDou
     }
 
     @Override
-    public void preApplication()
-            throws InstantiationException, IllegalAccessException {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void postApplication() {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void preSuperstep() {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void postSuperstep() {
-        // Do nothing, might be overriden by the user
-    }
-
-
-    @Override
     public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) {
         if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
             if (LOG.isDebugEnabled()) {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Mon Nov  7 22:09:35 2011
@@ -82,27 +82,6 @@ public abstract class Vertex<I extends W
     }
 
     @Override
-    public void preApplication()
-            throws InstantiationException, IllegalAccessException {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void postApplication() {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void preSuperstep() {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
-    public void postSuperstep() {
-        // Do nothing, might be overriden by the user
-    }
-
-    @Override
     public final boolean addEdge(I targetVertexId, E edgeValue) {
         if (destEdgeMap.put(
                 targetVertexId,

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Mon Nov  7 22:09:35 2011
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * WorkerContext allows for the execution of user code
+ * on a per-worker basis. There's one WorkerContext per worker.
+ *
+ */
+@SuppressWarnings("rawtypes")
+public abstract class WorkerContext implements AggregatorUsage {
+    /** Global graph state */
+	private GraphState graphState;
+
+	public void setGraphState(GraphState graphState) {
+		this.graphState = graphState;
+	}
+	
+    /**
+     * Initialize the WorkerContext.
+     * This method is executed once on each Worker before the first 
+     * superstep starts.
+     * 
+     * @throws IllegalAccessException 
+     * @throws InstantiationException 
+     */
+	public abstract void preApplication() throws InstantiationException,
+		IllegalAccessException;
+	
+    /**
+     * Finalize the WorkerContext.
+     * This method is executed once on each Worker after the last 
+     * superstep ends.
+     */
+    public abstract void postApplication();
+
+    /**
+     * Execute user code.
+     * This method is executed once on each Worker before each 
+     * superstep starts.
+     */
+    public abstract void preSuperstep();
+    
+    /**
+     * Execute user code.
+     * This method is executed once on each Worker after each 
+     * superstep ends.
+     */
+    public abstract void postSuperstep();
+
+    /**
+     * Retrieves the current superstep.
+     *
+     * @return Current superstep
+     */
+    public long getSuperstep() {
+        return graphState.getSuperstep();
+    }
+    
+    /**
+     * Get the total (all workers) number of vertices that
+     * existed in the previous superstep.
+     *
+     * @return Total number of vertices (-1 if first superstep)
+     */
+    public long getNumVertices() {
+    	return graphState.getNumVertices();
+    }
+    
+    /**
+     * Get the total (all workers) number of edges that
+     * existed in the previous superstep.
+     *
+     * @return Total number of edges (-1 if first superstep)
+     */
+    public long getNumEdges() {
+    	return graphState.getNumEdges();
+    }
+    
+    /**
+     * Get the mapper context
+     *
+     * @return Mapper context
+     */
+    public Mapper.Context getContext() {
+        return graphState.getContext();
+    }
+    
+    @Override
+    public final <A extends Writable> Aggregator<A> registerAggregator(
+            String name,
+            Class<? extends Aggregator<A>> aggregatorClass)
+            throws InstantiationException, IllegalAccessException {
+        return graphState.getGraphMapper().getAggregatorUsage().
+            registerAggregator(name, aggregatorClass);
+    }
+
+    @Override
+    public final Aggregator<? extends Writable> getAggregator(String name) {
+        return graphState.getGraphMapper().getAggregatorUsage().
+            getAggregator(name);
+    }
+
+    @Override
+    public final boolean useAggregator(String name) {
+        return graphState.getGraphMapper().getAggregatorUsage().
+            useAggregator(name);
+    }
+}
\ No newline at end of file

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Mon Nov  7 22:09:35 2011
@@ -81,6 +81,8 @@ public class TestAutoCheckpoint extends 
         job.setVertexClass(SimpleCheckpointVertex.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         Path outputPath = new Path("/tmp/" + getCallingMethodName());
         removeAndSetOutput(job, outputPath);
         assertTrue(job.run(true));

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Nov  7 22:09:35 2011
@@ -273,12 +273,17 @@ public class TestBspBasic extends BspCas
         GiraphJob job = new GiraphJob(getCallingMethodName());
         setupConfiguration(job);
         job.setVertexClass(SimplePageRankVertex.class);
+        job.setWorkerContextClass(
+        	SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
         assertTrue(job.run(true));
         if (getJobTracker() == null) {
-            double maxPageRank = SimplePageRankVertex.finalMax;
-            double minPageRank = SimplePageRankVertex.finalMin;
-            long numVertices = SimplePageRankVertex.finalSum;
+            double maxPageRank = 
+            	SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMax;
+            double minPageRank = 
+            	SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMin;
+            long numVertices = 
+            	SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalSum;
             System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
                                " minPageRank=" + minPageRank +
                                " numVertices=" + numVertices);

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Mon Nov  7 22:09:35 2011
@@ -68,6 +68,8 @@ public class TestManualCheckpoint extend
         job.getConfiguration().setBoolean(
             GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
         job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         Path outputPath = new Path("/tmp/" + getCallingMethodName());
@@ -78,7 +80,8 @@ public class TestManualCheckpoint extend
         if (getJobTracker() == null) {
             FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
             fileLen = fileStatus.getLen();
-            idSum = SimpleCheckpointVertex.finalSum;
+            idSum =
+            	SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.finalSum;
             System.out.println("testBspCheckpoint: idSum = " + idSum +
                                " fileLen = " + fileLen);
         }
@@ -94,6 +97,8 @@ public class TestManualCheckpoint extend
                                             HDFS_CHECKPOINT_DIR);
         restartedJob.getConfiguration().setLong(GiraphJob.RESTART_SUPERSTEP, 2);
         restartedJob.setVertexClass(SimpleCheckpointVertex.class);
+        restartedJob.setWorkerContextClass(
+        	SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         restartedJob.setVertexInputFormatClass(
             SimpleSuperstepVertexInputFormat.class);
         restartedJob.setVertexOutputFormatClass(
@@ -105,7 +110,8 @@ public class TestManualCheckpoint extend
             FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
             fileLen = fileStatus.getLen();
             assertTrue(fileStatus.getLen() == fileLen);
-            long idSumRestarted = SimpleCheckpointVertex.finalSum;
+            long idSumRestarted =
+            	SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.finalSum;
             System.out.println("testBspCheckpoint: idSumRestarted = " +
                                idSumRestarted);
             assertTrue(idSum == idSumRestarted);

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java Mon Nov  7 22:09:35 2011
@@ -60,6 +60,8 @@ public class TestMutateGraphVertex exten
         GiraphJob job = new GiraphJob(getCallingMethodName());
         setupConfiguration(job);
         job.setVertexClass(SimpleMutateGraphVertex.class);
+        job.setWorkerContextClass(
+        		SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         Path outputPath = new Path("/tmp/" + getCallingMethodName());

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Mon Nov  7 22:09:35 2011
@@ -65,6 +65,8 @@ public class TestVertexRangeBalancer ext
         GiraphJob job = new GiraphJob("testStaticBalancer");
         setupConfiguration(job);
         job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+        		SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         Path outputPath = new Path("/tmp/testStaticBalancer");
@@ -86,6 +88,8 @@ public class TestVertexRangeBalancer ext
         job = new GiraphJob("testSuperstepBalancer");
         setupConfiguration(job);
         job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+        		SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         job.setVertexRangeBalancerClass(SuperstepBalancer.class);
@@ -106,6 +110,8 @@ public class TestVertexRangeBalancer ext
         job = new GiraphJob("testAutoBalancer");
         setupConfiguration(job);
         job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+        		SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         job.setVertexRangeBalancerClass(AutoBalancer.class);