You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2011/12/05 23:02:15 UTC

svn commit: r1210662 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: claudio
Date: Mon Dec  5 22:02:15 2011
New Revision: 1210662

URL: http://svn.apache.org/viewvc?rev=1210662&view=rev
Log:
GIRAPH-10: Aggregators are not exported.

Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Dec  5 22:02:15 2011
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-10: Aggregators are not exported. (claudio)
+
   GIRAPH-100: GIRAPH-100 - Data input sampling and testing
   improvements. (aching)
 

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Mon Dec  5 22:02:15 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * This is a simple example for an aggregator writer. After each superstep
+ * the writer will persist the aggregator values to disk, by use of the
+ * Writable interface. The file will be created on the current working
+ * directory.
+ */
+public class SimpleAggregatorWriter implements AggregatorWriter {
+    /** the name of the file we wrote to */
+    public static String filename;
+    private FSDataOutputStream output;
+    
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void initialize(Context context, long applicationAttempt)
+            throws IOException {
+        filename = "aggregatedValues_"+applicationAttempt;
+        Path p = new Path(filename);
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        output = fs.create(p, true);
+    }
+
+    @Override
+    public void writeAggregator(Map<String, Aggregator<Writable>> map,
+            long superstep) throws IOException {
+
+        for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
+            aggregator.getValue().getAggregatedValue().write(output);
+        }
+        output.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        output.close();
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Mon Dec  5 22:02:15 2011
@@ -45,7 +45,9 @@ import java.util.Map;
  * Demonstrates the basic Pregel PageRank implementation.
  */
 public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
-	/** Logger */
+    /** Number of supersteps for this test */
+    public static final int MAX_SUPERSTEPS = 30;
+    /** Logger */
     private static final Logger LOG =
         Logger.getLogger(SimplePageRankVertex.class);
 
@@ -70,7 +72,7 @@ public class SimplePageRankVertex extend
                      " min=" + minAggreg.getAggregatedValue());
         }
 
-        if (getSuperstep() < 30) {
+        if (getSuperstep() < MAX_SUPERSTEPS) {
             long edges = getNumOutEdges();
             sendMsgToAllEdges(
                 new DoubleWritable(getVertexValue().get() / edges));

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Mon Dec  5 22:02:15 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ *  An AggregatorWriter is used to export Aggregators during or at the end of 
+ *  each computation. It runs on the master and it's called at the end of each
+ *  superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is 
+ *  passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the 
+ *  superstep value to signal the end of computation.
+ */
+public interface AggregatorWriter {
+    /** Signal for last superstep */
+    public static final int LAST_SUPERSTEP = -1;
+
+    /**
+     * The method is called at the initialization of the AggregatorWriter.
+     * More precisely, the aggregatorWriter is initialized each time a new
+     * master is elected.
+     * 
+     * @param context Mapper Context where the master is running on
+     * @param applicationAttempt ID of the applicationAttempt, used to
+     *        disambiguate aggregator writes for different attempts
+     * @throws IOException
+     */
+    @SuppressWarnings("rawtypes")
+    void initialize(Context context, long applicationAttempt) throws IOException;
+
+    /**
+     * The method is called at the end of each superstep. The user might decide
+     * whether to write the aggregators values for the current superstep. For 
+     * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
+     * 
+     * @param map Map of aggregators to write
+     * @param superstep Current superstep
+     * @throws IOException
+     */
+    void writeAggregator(
+            Map<String, Aggregator<Writable>> aggregatorMap, 
+            long superstep) throws IOException;
+
+    /**
+     * The method is called at the end of a successful computation. The method
+     * is not called when the job fails and a new master is elected. For this
+     * reason it's advised to flush data at the end of 
+     * {@link AggregatorWriter#writeAggregator(Map, long)}.
+     * 
+     * @throws IOException
+     */
+    void close() throws IOException;
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Dec  5 22:02:15 2011
@@ -125,9 +125,10 @@ public class BspServiceMaster<
     /** All the partition stats from the last superstep */
     private final List<PartitionStats> allPartitionStatsList =
         new ArrayList<PartitionStats>();
-
     /** Counter group name for the Giraph statistics */
     public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+    /** Aggregator writer */
+    public AggregatorWriter aggregatorWriter;
 
     public BspServiceMaster(
             String serverPortList,
@@ -736,6 +737,15 @@ public class BspServiceMaster<
                     currentMasterTaskPartitionCounter.increment(
                         getTaskPartition() -
                         currentMasterTaskPartitionCounter.getValue());
+                    aggregatorWriter = 
+                        BspUtils.createAggregatorWriter(getConfiguration());
+                    try {
+                        aggregatorWriter.initialize(getContext(),
+                                                    getApplicationAttempt());
+                    } catch (IOException e) {
+                        throw new IllegalStateException("becomeMaster: " +
+                            "Couldn't initialize aggregatorWriter", e);
+                    }
                     LOG.info("becomeMaster: I am now the master!");
                     isMaster = true;
                     return isMaster;
@@ -823,7 +833,7 @@ public class BspServiceMaster<
 
     /**
      * Get the aggregator values for a particular superstep,
-     * aggregate and save them.  Does nothing on the INPUT_SUPERSTEP.
+     * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
      *
      * @param superstep superstep to check
      */
@@ -1500,13 +1510,25 @@ public class BspServiceMaster<
         if (getSuperstep() > 0) {
             superstepCounter.increment(1);
         }
+        SuperstepState superstepState;
         if ((globalStats.getFinishedVertexCount() ==
                 globalStats.getVertexCount()) &&
                 globalStats.getMessageCount() == 0) {
-            return SuperstepState.ALL_SUPERSTEPS_DONE;
+            superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
         } else {
-            return SuperstepState.THIS_SUPERSTEP_DONE;
+            superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
+        }
+        try {
+            aggregatorWriter.writeAggregator(getAggregatorMap(),
+                (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ? 
+                    AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "coordinateSuperstep: IOException while " +
+                "writing aggregators data", e);
         }
+        
+        return superstepState;
     }
 
     /**
@@ -1634,6 +1656,7 @@ public class BspServiceMaster<
                              success + " since this job succeeded ");
                 }
             }
+            aggregatorWriter.close();
         }
 
         try {

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Mon Dec  5 22:02:15 2011
@@ -151,6 +151,32 @@ public class BspUtils {
             getVertexOutputFormatClass(conf);
         return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
     }
+    
+    /**
+     * Get the user's subclassed {@link AggregatorWriter}.
+     *
+     * @param conf Configuration to check
+     * @return User's aggregator writer class
+     */
+    public static Class<? extends AggregatorWriter>
+            getAggregatorWriterClass(Configuration conf) {
+        return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+                             TextAggregatorWriter.class,
+                             AggregatorWriter.class);
+    }
+
+    /**
+     * Create a user aggregator output format class
+     *
+     * @param conf Configuration to check
+     * @return Instantiated user aggregator writer class
+     */
+    public static AggregatorWriter
+            createAggregatorWriter(Configuration conf) {
+        Class<? extends AggregatorWriter> aggregatorWriterClass =
+            getAggregatorWriterClass(conf);
+        return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
+    }
 
     /**
      * Get the user's subclassed {@link VertexCombiner}.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Mon Dec  5 22:02:15 2011
@@ -61,7 +61,11 @@ public class GiraphJob extends Job {
     /** Message value class */
     public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
     /** Worker context class */
-    public static final String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+    public static final String WORKER_CONTEXT_CLASS = 
+    	"giraph.workerContextClass";
+    /** AggregatorWriter class - optional */
+    public static final String AGGREGATOR_WRITER_CLASS = 
+    	"giraph.aggregatorWriterClass";
 
     /**
      * Minimum number of simultaneous workers before this job can run (int)
@@ -447,7 +451,20 @@ public class GiraphJob extends Job {
                                     workerContextClass,
                                     WorkerContext.class);
     }
-
+    
+    /**
+     * Set the aggregator writer class (optional)
+     *
+     * @param aggregatorWriterClass Determines how the aggregators are
+     * 	      written to file at the end of the job
+     */
+     final public void setAggregatorWriterClass(
+    		 Class<?> aggregatorWriterClass) {
+         getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
+                                     aggregatorWriterClass,
+                                     AggregatorWriter.class);
+     }
+    
     /**
      * Set worker configuration for determining what is required for
      * a superstep.

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Mon Dec  5 22:02:15 2011
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * Default implementation of {@link AggregatorWriter}. Each line consists of 
+ * text and contains the aggregator name, the aggregator value and the 
+ * aggregator class.
+ */
+public class TextAggregatorWriter 
+        implements AggregatorWriter {
+    /** The filename of the outputfile */
+    public static final String FILENAME = 
+        "giraph.textAggregatorWriter.filename";
+    /** The frequency of writing:
+     *  - NEVER: never write, files aren't created at all
+     *  - AT_THE_END: aggregators are written only when the computation is over
+     *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on 
+     */
+    public static final String FREQUENCY = 
+        "giraph.textAggregatorWriter.frequency";
+    private static final String DEFAULT_FILENAME = "aggregatorValues";
+    /** Signal for "never write" frequency */
+    public static final int NEVER = 0;
+    /** Signal for "write only the final values" frequency */
+    public static final int AT_THE_END = -1;
+    /** Handle to the outputfile */
+    protected FSDataOutputStream output;
+    private int frequency;
+    
+    @Override
+    @SuppressWarnings("rawtypes")
+    public void initialize(Context context, long attempt) throws IOException {
+        Configuration conf = context.getConfiguration();
+        frequency = conf.getInt(FREQUENCY, NEVER);
+        String filename  = conf.get(FILENAME, DEFAULT_FILENAME);
+        if (frequency != NEVER) {
+            Path p = new Path(filename+"_"+attempt);
+            FileSystem fs = FileSystem.get(conf);
+            if (fs.exists(p)) {
+                throw new RuntimeException("aggregatorWriter file already" +
+                    " exists: " + p.getName());
+            }
+            output = fs.create(p);
+        }
+    }
+
+    @Override
+    final public void writeAggregator(
+            Map<String, Aggregator<Writable>> aggregators,
+            long superstep) throws IOException {
+        
+        if (shouldWrite(superstep)) {
+            for (Entry<String, Aggregator<Writable>> a: 
+                    aggregators.entrySet()) {
+                output.writeUTF(aggregatorToString(a.getKey(), 
+                                                   a.getValue(), 
+                                                   superstep));
+            }
+            output.flush();
+        }
+    }
+    
+    /**
+     * Implements the way an aggregator is converted into a String.
+     * Override this if you want to implement your own text format.
+     * 
+     * @param aggregatorName Name of the aggregator
+     * @param a Aggregator
+     * @param superstep Current superstep
+     * @return The String representation for the aggregator
+     */
+    protected String aggregatorToString(String aggregatorName, 
+                                        Aggregator<Writable> a,
+                                        long superstep) {
+
+        return new StringBuilder("superstep=").append(superstep).append("\t")
+            .append(aggregatorName).append("=").append(a.getAggregatedValue())
+            .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+            .toString();
+    }
+
+    private boolean shouldWrite(long superstep) {
+        return ((frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
+                (frequency != NEVER && superstep % frequency == 0));
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (output != null) {
+            output.close();
+        }
+    }
+}

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Mon Dec  5 22:02:15 2011
@@ -226,10 +226,22 @@ public class BspCase extends TestCase im
     public static void removeAndSetOutput(GiraphJob job,
                                           Path outputPath)
             throws IOException {
-        FileSystem hdfs = FileSystem.get(job.getConfiguration());
-        hdfs.delete(outputPath, true);
+        remove(job.getConfiguration(), outputPath);
         FileOutputFormat.setOutputPath(job, outputPath);
     }
+    
+    /**
+     * Helper method to remove a path if it exists.
+     * 
+     * @param conf Configutation
+     * @param path Path to remove
+     * @throws IOException
+     */
+    public static void remove(Configuration conf, Path path) 
+            throws IOException {
+        FileSystem hdfs = FileSystem.get(conf);
+        hdfs.delete(path, true);
+    }
 
     public static String getCallingMethodName() {
         return Thread.currentThread().getStackTrace()[2].getMethodName();

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Dec  5 22:02:15 2011
@@ -20,6 +20,8 @@ package org.apache.giraph;
 
 import junit.framework.Test;
 import junit.framework.TestSuite;
+
+import org.apache.giraph.examples.SimpleAggregatorWriter;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
@@ -38,8 +40,11 @@ import org.apache.giraph.graph.GraphStat
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -331,4 +336,66 @@ public class TestBspBasic extends BspCas
             assertTrue(fileStatus.getLen() == fileStatus2.getLen());
         }
     }
+    
+    /**
+     * Run a sample BSP job locally and test PageRank with AggregatorWriter.
+     *
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    public void testBspPageRankWithAggregatorWriter()
+            throws IOException, InterruptedException, ClassNotFoundException {
+        GiraphJob job = new GiraphJob(getCallingMethodName());
+        setupConfiguration(job);
+        job.setVertexClass(SimplePageRankVertex.class);
+        job.setWorkerContextClass(
+            SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+        job.setAggregatorWriterClass(SimpleAggregatorWriter.class);
+        Path outputPath = new Path("/tmp/" + getCallingMethodName());
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        if (getJobTracker() == null) {
+            double maxPageRank = 
+                SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMax;
+            double minPageRank = 
+                SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMin;
+            long numVertices = 
+                SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalSum;
+            System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
+                               " minPageRank=" + minPageRank +
+                               " numVertices=" + numVertices);
+            FileSystem fs = FileSystem.get(new Configuration());
+            FSDataInputStream input = 
+                fs.open(new Path(SimpleAggregatorWriter.filename));
+            int i, all;
+            for (i = 0; ; i++) {
+                all = 0;
+                try {
+                    DoubleWritable max = new DoubleWritable();
+                    max.readFields(input);
+                    all++;
+                    DoubleWritable min = new DoubleWritable();
+                    min.readFields(input);
+                    all++;
+                    LongWritable sum = new LongWritable();
+                    sum.readFields(input);
+                    all++;
+                    if (i > 0) {
+                        assertTrue(max.get() == maxPageRank);
+                        assertTrue(min.get() == minPageRank);
+                        assertTrue(sum.get() == numVertices);
+                    }
+                } catch (IOException e) {
+                    break;
+                }
+            }
+            input.close();
+            // contained all supersteps
+            assertTrue(i == SimplePageRankVertex.MAX_SUPERSTEPS+1 && all == 0);
+            remove(new Configuration(), 
+                   new Path(SimpleAggregatorWriter.filename));
+        }
+    }
 }