You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [5/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java Thu Feb 16 22:12:31 2012
@@ -22,47 +22,50 @@ import org.apache.giraph.graph.EdgeListV
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
 
 import java.util.Iterator;
 
 /**
  * Vertex to allow unit testing of failure detection
  */
-public class SimpleFailVertex extends
-        EdgeListVertex<LongWritable, DoubleWritable,
-        FloatWritable, DoubleWritable> {
+public class SimpleFailVertex extends EdgeListVertex<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleFailVertex.class);
+  /** TODO: Change this behavior to WorkerContext */
+  private static long SUPERSTEP = 0;
 
-    static long superstep = 0;
-
-    @Override
-    public void compute(Iterator<DoubleWritable> msgIterator) {
-        if (getSuperstep() >= 1) {
-            double sum = 0;
-            while (msgIterator.hasNext()) {
-                sum += msgIterator.next().get();
-            }
-            DoubleWritable vertexValue =
-                new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
-            setVertexValue(vertexValue);
-            if (getSuperstep() < 30) {
-                if (getSuperstep() == 20) {
-                    if (getVertexId().get() == 10L) {
-                        try {
-                            Thread.sleep(2000);
-                        } catch (InterruptedException e) {
-                        }
-                        System.exit(1);
-                    } else if (getSuperstep() - superstep > 10) {
-                        return;
-                    }
-                }
-                long edges = getNumOutEdges();
-                sendMsgToAllEdges(
-                    new DoubleWritable(getVertexValue().get() / edges));
-            } else {
-                voteToHalt();
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator) {
+    if (getSuperstep() >= 1) {
+      double sum = 0;
+      while (msgIterator.hasNext()) {
+        sum += msgIterator.next().get();
+      }
+      DoubleWritable vertexValue =
+          new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
+      setVertexValue(vertexValue);
+      if (getSuperstep() < 30) {
+        if (getSuperstep() == 20) {
+          if (getVertexId().get() == 10L) {
+            try {
+              Thread.sleep(2000);
+            } catch (InterruptedException e) {
+              LOG.info("Sleep interrupted ", e);
             }
-            superstep = getSuperstep();
+            System.exit(1);
+          } else if (getSuperstep() - SUPERSTEP > 10) {
+            return;
+          }
         }
+        long edges = getNumOutEdges();
+        sendMsgToAllEdges(
+            new DoubleWritable(getVertexValue().get() / edges));
+      } else {
+        voteToHalt();
+      }
+      SUPERSTEP = getSuperstep();
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Thu Feb 16 22:12:31 2012
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
 
 import org.apache.giraph.graph.EdgeListVertex;
 
@@ -30,34 +31,35 @@ import org.apache.giraph.graph.EdgeListV
  * Test whether messages can be sent and received by vertices.
  */
 public class SimpleMsgVertex extends
-        EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
-    @Override
-    public void compute(Iterator<IntWritable> msgIterator) {
-        if (getVertexId().equals(new LongWritable(2))) {
-            sendMsg(new LongWritable(1), new IntWritable(101));
-            sendMsg(new LongWritable(1), new IntWritable(102));
-            sendMsg(new LongWritable(1), new IntWritable(103));
-        }
-        if (!getVertexId().equals(new LongWritable(1))) {
-            voteToHalt();
-        }
-        else {
-            /* Check the messages */
-            int sum = 0;
-            while (msgIterator != null && msgIterator.hasNext()) {
-                sum += msgIterator.next().get();
-            }
-            System.out.println("TestMsgVertex: Received a sum of " + sum +
-            " (will stop on 306)");
+    EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
+  @Override
+  public void compute(Iterator<IntWritable> msgIterator) {
+    if (getVertexId().equals(new LongWritable(2))) {
+      sendMsg(new LongWritable(1), new IntWritable(101));
+      sendMsg(new LongWritable(1), new IntWritable(102));
+      sendMsg(new LongWritable(1), new IntWritable(103));
+    }
+    if (!getVertexId().equals(new LongWritable(1))) {
+      voteToHalt();
+    } else {
+      /* Check the messages */
+      int sum = 0;
+      while (msgIterator != null && msgIterator.hasNext()) {
+        sum += msgIterator.next().get();
+      }
+      LOG.info("TestMsgVertex: Received a sum of " + sum +
+          " (will stop on 306)");
 
-            if (sum == 306) {
-                voteToHalt();
-            }
-        }
-        if (getSuperstep() > 3) {
-            System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
-                               "messages in time");
-            voteToHalt();
-        }
+      if (sum == 306) {
+        voteToHalt();
+      }
+    }
+    if (getSuperstep() > 3) {
+      System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
+          "messages in time");
+      voteToHalt();
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Thu Feb 16 22:12:31 2012
@@ -34,163 +34,171 @@ import org.apache.giraph.graph.WorkerCon
 /**
  * Vertex to allow unit testing of graph mutations.
  */
-public class SimpleMutateGraphVertex extends
-        EdgeListVertex<LongWritable, DoubleWritable,
-        FloatWritable, DoubleWritable> {
-    /** Maximum number of ranges for vertex ids */
-    private long maxRanges = 100;
-    /** Class logger */
-    private static Logger LOG =
-        Logger.getLogger(SimpleMutateGraphVertex.class);
-
-    /**
-     * Unless we create a ridiculous number of vertices , we should not
-     * collide within a vertex range defined by this method.
-     *
-     * @return Starting vertex id of the range
-     */
-    private long rangeVertexIdStart(int range) {
-        return (Long.MAX_VALUE / maxRanges) * range;
+public class SimpleMutateGraphVertex extends EdgeListVertex<
+    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** Class logger */
+  private static Logger LOG =
+      Logger.getLogger(SimpleMutateGraphVertex.class);
+  /** Maximum number of ranges for vertex ids */
+  private long maxRanges = 100;
+
+
+  /**
+   * Unless we create a ridiculous number of vertices , we should not
+   * collide within a vertex range defined by this method.
+   *
+   * @param range Range index
+   * @return Starting vertex id of the range
+   */
+  private long rangeVertexIdStart(int range) {
+    return (Long.MAX_VALUE / maxRanges) * range;
+  }
+
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator)
+    throws IOException {
+    SimpleMutateGraphVertexWorkerContext workerContext =
+        (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
+    if (getSuperstep() == 0) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 1) {
+      // Send messages to vertices that are sure not to exist
+      // (creating them)
+      LongWritable destVertexId =
+          new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
+      sendMsg(destVertexId, new DoubleWritable(0.0));
+    } else if (getSuperstep() == 2) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 3) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount * 2 != getNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumVertices() +
+            " vertices when should have " + vertexCount * 2 +
+            " on superstep " + getSuperstep());
+      }
+      long edgeCount = workerContext.getEdgeCount();
+      if (edgeCount != getNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumEdges() +
+            " edges when should have " + edgeCount +
+            " on superstep " + getSuperstep());
+      }
+      // Create vertices that are sure not to exist (doubling vertices)
+      LongWritable vertexIndex =
+          new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+      BasicVertex<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> vertex =
+        instantiateVertex(vertexIndex, null, null, null);
+      addVertexRequest(vertex);
+      // Add edges to those remote vertices as well
+      addEdgeRequest(vertexIndex,
+          new Edge<LongWritable, FloatWritable>(
+              getVertexId(), new FloatWritable(0.0f)));
+    } else if (getSuperstep() == 4) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 5) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount * 2 != getNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumVertices() +
+            " when should have " + vertexCount * 2 +
+            " on superstep " + getSuperstep());
+      }
+      long edgeCount = workerContext.getEdgeCount();
+      if (edgeCount + vertexCount != getNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumEdges() +
+            " edges when should have " + edgeCount + vertexCount +
+            " on superstep " + getSuperstep());
+      }
+      // Remove the edges created in superstep 3
+      LongWritable vertexIndex =
+          new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+      workerContext.increaseEdgesRemoved();
+      removeEdgeRequest(vertexIndex, getVertexId());
+    } else if (getSuperstep() == 6) {
+      // Remove all the vertices created in superstep 3
+      if (getVertexId().compareTo(
+          new LongWritable(rangeVertexIdStart(3))) >= 0) {
+        removeVertexRequest(getVertexId());
+      }
+    } else if (getSuperstep() == 7) {
+      long origEdgeCount = workerContext.getOrigEdgeCount();
+      if (origEdgeCount != getNumEdges()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumEdges() +
+            " edges when should have " + origEdgeCount +
+            " on superstep " + getSuperstep());
+      }
+    } else if (getSuperstep() == 8) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount / 2 != getNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getNumVertices() +
+            " vertices when should have " + vertexCount / 2 +
+            " on superstep " + getSuperstep());
+      }
+    } else {
+      voteToHalt();
     }
+  }
+
+  /**
+   * Worker context used with {@link SimpleMutateGraphVertex}.
+   */
+  public static class SimpleMutateGraphVertexWorkerContext
+      extends WorkerContext {
+    /** Cached vertex count */
+    private long vertexCount;
+    /** Cached edge count */
+    private long edgeCount;
+    /** Original number of edges */
+    private long origEdgeCount;
+    /** Number of edges removed during superstep */
+    private int edgesRemoved = 0;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException { }
+
+    @Override
+    public void postApplication() { }
 
     @Override
-    public void compute(Iterator<DoubleWritable> msgIterator)
-            throws IOException {
+    public void preSuperstep() { }
 
-    	SimpleMutateGraphVertexWorkerContext workerContext =
-    		(SimpleMutateGraphVertexWorkerContext) getWorkerContext();
-    	if (getSuperstep() == 0) {
-    	} else if (getSuperstep() == 1) {
-            // Send messages to vertices that are sure not to exist
-            // (creating them)
-            LongWritable destVertexId =
-                new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
-            sendMsg(destVertexId, new DoubleWritable(0.0));
-        } else if (getSuperstep() == 2) {
-        } else if (getSuperstep() == 3) {
-        	long vertexCount = workerContext.getVertexCount();
-            if (vertexCount * 2 != getNumVertices()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumVertices() +
-                    " vertices when should have " + vertexCount * 2 +
-                    " on superstep " + getSuperstep());
-            }
-            long edgeCount = workerContext.getEdgeCount();
-            if (edgeCount != getNumEdges()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumEdges() +
-                    " edges when should have " + edgeCount +
-                    " on superstep " + getSuperstep());
-            }
-            // Create vertices that are sure not to exist (doubling vertices)
-            LongWritable vertexIndex =
-                new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
-            BasicVertex<LongWritable, DoubleWritable,
-                FloatWritable, DoubleWritable> vertex =
-                    instantiateVertex(vertexIndex, null, null, null);
-            addVertexRequest(vertex);
-            // Add edges to those remote vertices as well
-            addEdgeRequest(vertexIndex,
-                           new Edge<LongWritable, FloatWritable>(
-                               getVertexId(), new FloatWritable(0.0f)));
-        } else if (getSuperstep() == 4) {
-        } else if (getSuperstep() == 5) {
-        	long vertexCount = workerContext.getVertexCount();
-            if (vertexCount * 2 != getNumVertices()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumVertices() +
-                    " when should have " + vertexCount * 2 +
-                    " on superstep " + getSuperstep());
-            }
-            long edgeCount = workerContext.getEdgeCount();
-            if (edgeCount + vertexCount != getNumEdges()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumEdges() +
-                    " edges when should have " + edgeCount + vertexCount +
-                    " on superstep " + getSuperstep());
-            }
-            // Remove the edges created in superstep 3
-            LongWritable vertexIndex =
-                new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
-            workerContext.increaseEdgesRemoved();
-            removeEdgeRequest(vertexIndex, getVertexId());
-        } else if (getSuperstep() == 6) {
-            // Remove all the vertices created in superstep 3
-            if (getVertexId().compareTo(
-                    new LongWritable(rangeVertexIdStart(3))) >= 0) {
-                removeVertexRequest(getVertexId());
-            }
-        } else if (getSuperstep() == 7) {
-        	long orig_edge_count = workerContext.getOrigEdgeCount();
-            if (orig_edge_count != getNumEdges()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumEdges() +
-                    " edges when should have " + orig_edge_count +
-                    " on superstep " + getSuperstep());
-            }
-        } else if (getSuperstep() == 8) {
-        	long vertex_count = workerContext.getVertexCount();
-            if (vertex_count / 2 != getNumVertices()) {
-                throw new IllegalStateException(
-                    "Impossible to have " + getNumVertices() +
-                    " vertices when should have " + vertex_count / 2 +
-                    " on superstep " + getSuperstep());
-            }
-        }
-        else {
-            voteToHalt();
-        }
-    }
-
-    public static class SimpleMutateGraphVertexWorkerContext
-            extends WorkerContext {
-        /** Cached vertex count */
-        private long vertexCount;
-        /** Cached edge count */
-        private long edgeCount;
-        /** Original number of edges */
-        private long origEdgeCount;
-        /** Number of edges removed during superstep */
-        private int edgesRemoved = 0;
-
-		@Override
-		public void preApplication()
-				throws InstantiationException, IllegalAccessException { }
-
-		@Override
-		public void postApplication() { }
-
-        @Override
-        public void preSuperstep() { }
-
-		@Override
-		public void postSuperstep() {
-			vertexCount = getNumVertices();
-			edgeCount = getNumEdges();
-			if (getSuperstep() == 1) {
-				origEdgeCount = edgeCount;
-			}
-			LOG.info("Got " + vertexCount + " vertices, " +
-					 edgeCount + " edges on superstep " +
-					 getSuperstep());
-			LOG.info("Removed " + edgesRemoved);
-			edgesRemoved = 0;
-		}
-
-		public long getVertexCount() {
-			return vertexCount;
-		}
-
-		public long getEdgeCount() {
-			return edgeCount;
-		}
-
-		public long getOrigEdgeCount() {
-			return origEdgeCount;
-		}
-
-		public void increaseEdgesRemoved() {
-			this.edgesRemoved++;
-		}
+    @Override
+    public void postSuperstep() {
+      vertexCount = getNumVertices();
+      edgeCount = getNumEdges();
+      if (getSuperstep() == 1) {
+        origEdgeCount = edgeCount;
+      }
+      LOG.info("Got " + vertexCount + " vertices, " +
+          edgeCount + " edges on superstep " +
+          getSuperstep());
+      LOG.info("Removed " + edgesRemoved);
+      edgesRemoved = 0;
+    }
+
+    public long getVertexCount() {
+      return vertexCount;
+    }
+
+    public long getEdgeCount() {
+      return edgeCount;
+    }
+
+    public long getOrigEdgeCount() {
+      return origEdgeCount;
+    }
+
+    /**
+     * Increase the number of edges removed by one.
+     */
+    public void increaseEdgesRemoved() {
+      this.edgesRemoved++;
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Feb 16 22:12:31 2012
@@ -45,205 +45,230 @@ import java.util.Map;
  * Demonstrates the basic Pregel PageRank implementation.
  */
 public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
-    /** Number of supersteps for this test */
-    public static final int MAX_SUPERSTEPS = 30;
-    /** Logger */
-    private static final Logger LOG =
-        Logger.getLogger(SimplePageRankVertex.class);
+  /** Number of supersteps for this test */
+  public static final int MAX_SUPERSTEPS = 30;
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimplePageRankVertex.class);
+
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator) {
+    LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
+    MinAggregator minAggreg = (MinAggregator) getAggregator("min");
+    MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
+    if (getSuperstep() >= 1) {
+      double sum = 0;
+      while (msgIterator.hasNext()) {
+        sum += msgIterator.next().get();
+      }
+      DoubleWritable vertexValue =
+          new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
+      setVertexValue(vertexValue);
+      maxAggreg.aggregate(vertexValue);
+      minAggreg.aggregate(vertexValue);
+      sumAggreg.aggregate(1L);
+      LOG.info(getVertexId() + ": PageRank=" + vertexValue +
+          " max=" + maxAggreg.getAggregatedValue() +
+          " min=" + minAggreg.getAggregatedValue());
+    }
+
+    if (getSuperstep() < MAX_SUPERSTEPS) {
+      long edges = getNumOutEdges();
+      sendMsgToAllEdges(
+          new DoubleWritable(getVertexValue().get() / edges));
+    } else {
+      voteToHalt();
+    }
+  }
+
+  /**
+   * Worker context used with {@link SimplePageRankVertex}.
+   */
+  public static class SimplePageRankVertexWorkerContext extends
+      WorkerContext {
+    /** Final max value for verification for local jobs */
+    private static double FINAL_MAX;
+    /** Final min value for verification for local jobs */
+    private static double FINAL_MIN;
+    /** Final sum value for verification for local jobs */
+    private static long FINAL_SUM;
+
+    public static double getFinalMax() {
+      return FINAL_MAX;
+    }
 
-    @Override
-    public void compute(Iterator<DoubleWritable> msgIterator) {
-        LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
-        MinAggregator minAggreg = (MinAggregator) getAggregator("min");
-        MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
-        if (getSuperstep() >= 1) {
-            double sum = 0;
-            while (msgIterator.hasNext()) {
-                sum += msgIterator.next().get();
-            }
-            DoubleWritable vertexValue =
-                new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
-            setVertexValue(vertexValue);
-            maxAggreg.aggregate(vertexValue);
-            minAggreg.aggregate(vertexValue);
-            sumAggreg.aggregate(1L);
-            LOG.info(getVertexId() + ": PageRank=" + vertexValue +
-                     " max=" + maxAggreg.getAggregatedValue() +
-                     " min=" + minAggreg.getAggregatedValue());
-        }
+    public static double getFinalMin() {
+      return FINAL_MIN;
+    }
 
-        if (getSuperstep() < MAX_SUPERSTEPS) {
-            long edges = getNumOutEdges();
-            sendMsgToAllEdges(
-                new DoubleWritable(getVertexValue().get() / edges));
-        } else {
-            voteToHalt();
-        }
+    public static long getFinalSum() {
+      return FINAL_SUM;
     }
 
-	public static class SimplePageRankVertexWorkerContext extends
-    		WorkerContext {
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      registerAggregator("sum", LongSumAggregator.class);
+      registerAggregator("min", MinAggregator.class);
+      registerAggregator("max", MaxAggregator.class);
+    }
 
-    	public static double finalMax, finalMin;
-    	public static long finalSum;
-    	
-    	@Override
-    	public void preApplication() 
-    	throws InstantiationException, IllegalAccessException {
-    		
-    		registerAggregator("sum", LongSumAggregator.class);
-    		registerAggregator("min", MinAggregator.class);
-    		registerAggregator("max", MaxAggregator.class);			
-    	}
-
-    	@Override
-    	public void postApplication() {
-
-    		LongSumAggregator sumAggreg = 
-    			(LongSumAggregator) getAggregator("sum");
-    		MinAggregator minAggreg = 
-    			(MinAggregator) getAggregator("min");
-    		MaxAggregator maxAggreg = 
-    			(MaxAggregator) getAggregator("max");
-
-    		finalSum = sumAggreg.getAggregatedValue().get();
-    		finalMax = maxAggreg.getAggregatedValue().get();
-    		finalMin = minAggreg.getAggregatedValue().get();
-    		
-            LOG.info("aggregatedNumVertices=" + finalSum);
-            LOG.info("aggregatedMaxPageRank=" + finalMax);
-            LOG.info("aggregatedMinPageRank=" + finalMin);
-    	}
-
-		@Override
-		public void preSuperstep() {
-    		
-	        LongSumAggregator sumAggreg = 
-	        	(LongSumAggregator) getAggregator("sum");
-	        MinAggregator minAggreg = 
-	        	(MinAggregator) getAggregator("min");
-	        MaxAggregator maxAggreg = 
-	        	(MaxAggregator) getAggregator("max");
-	        
-	        if (getSuperstep() >= 3) {
-	            LOG.info("aggregatedNumVertices=" +
-	                    sumAggreg.getAggregatedValue() +
-	                    " NumVertices=" + getNumVertices());
-	            if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
-	                throw new RuntimeException("wrong value of SumAggreg: " +
-	                        sumAggreg.getAggregatedValue() + ", should be: " +
-	                        getNumVertices());
-	            }
-	            DoubleWritable maxPagerank =
-	                    (DoubleWritable) maxAggreg.getAggregatedValue();
-	            LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
-	            DoubleWritable minPagerank =
-	                    (DoubleWritable) minAggreg.getAggregatedValue();
-	            LOG.info("aggregatedMinPageRank=" + minPagerank.get());
-	        }
-	        useAggregator("sum");
-	        useAggregator("min");
-	        useAggregator("max");
-	        sumAggreg.setAggregatedValue(new LongWritable(0L));
-		}
+    @Override
+    public void postApplication() {
 
-		@Override
-		public void postSuperstep() { }
+      LongSumAggregator sumAggreg =
+          (LongSumAggregator) getAggregator("sum");
+      MinAggregator minAggreg =
+          (MinAggregator) getAggregator("min");
+      MaxAggregator maxAggreg =
+          (MaxAggregator) getAggregator("max");
+
+      FINAL_SUM = sumAggreg.getAggregatedValue().get();
+      FINAL_MAX = maxAggreg.getAggregatedValue().get();
+      FINAL_MIN = minAggreg.getAggregatedValue().get();
+
+      LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+      LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+      LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
     }
-    
-    /**
-     * Simple VertexReader that supports {@link SimplePageRankVertex}
-     */
-    public static class SimplePageRankVertexReader extends
-            GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
-                DoubleWritable> {
-        /** Class logger */
-        private static final Logger LOG =
-            Logger.getLogger(SimplePageRankVertexReader.class);
-
-        public SimplePageRankVertexReader() {
-            super();
-        }
 
-        @Override
-        public boolean nextVertex() {
-            return totalRecords > recordsRead;
-        }
+    @Override
+    public void preSuperstep() {
 
-        @Override
-        public BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-          getCurrentVertex() throws IOException {
-            BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-                vertex = BspUtils.createVertex(configuration);
-
-            LongWritable vertexId = new LongWritable(
-                (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
-            DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
-            long destVertexId =
-                (vertexId.get() + 1) %
-                (inputSplit.getNumSplits() * totalRecords);
-            float edgeValue = vertexId.get() * 100f;
-            Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
-            edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
-            vertex.initialize(vertexId, vertexValue, edges, null);
-            ++recordsRead;
-            if (LOG.isInfoEnabled()) {
-	        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-	                 ", vertexValue=" + vertex.getVertexValue() +
-	                 ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
-            }
-            return vertex;
-        }
+      LongSumAggregator sumAggreg =
+          (LongSumAggregator) getAggregator("sum");
+      MinAggregator minAggreg =
+          (MinAggregator) getAggregator("min");
+      MaxAggregator maxAggreg =
+          (MaxAggregator) getAggregator("max");
+
+      if (getSuperstep() >= 3) {
+        LOG.info("aggregatedNumVertices=" +
+            sumAggreg.getAggregatedValue() +
+            " NumVertices=" + getNumVertices());
+        if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+          throw new RuntimeException("wrong value of SumAggreg: " +
+              sumAggreg.getAggregatedValue() + ", should be: " +
+              getNumVertices());
+        }
+        DoubleWritable maxPagerank =
+            (DoubleWritable) maxAggreg.getAggregatedValue();
+        LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+        DoubleWritable minPagerank =
+            (DoubleWritable) minAggreg.getAggregatedValue();
+        LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+      }
+      useAggregator("sum");
+      useAggregator("min");
+      useAggregator("max");
+      sumAggreg.setAggregatedValue(new LongWritable(0L));
     }
 
-    /**
-     * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
-     */
-    public static class SimplePageRankVertexInputFormat extends
-            GeneratedVertexInputFormat<LongWritable,
-            DoubleWritable, FloatWritable, DoubleWritable> {
-        @Override
-        public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-                createVertexReader(InputSplit split,
-                                   TaskAttemptContext context)
-                                   throws IOException {
-            return new SimplePageRankVertexReader();
-        }
-    }
+    @Override
+    public void postSuperstep() { }
+  }
+
+  /**
+   * Simple VertexReader that supports {@link SimplePageRankVertex}
+   */
+  public static class SimplePageRankVertexReader extends
+      GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
+      DoubleWritable> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimplePageRankVertexReader.class);
 
     /**
-     * Simple VertexWriter that supports {@link SimplePageRankVertex}
+     * Constructor.
      */
-    public static class SimplePageRankVertexWriter extends
-            TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
-        public SimplePageRankVertexWriter(
-                RecordWriter<Text, Text> lineRecordWriter) {
-            super(lineRecordWriter);
-        }
+    public SimplePageRankVertexReader() {
+      super();
+    }
 
-        @Override
-        public void writeVertex(
-                BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
-                throws IOException, InterruptedException {
-            getRecordWriter().write(
-                new Text(vertex.getVertexId().toString()),
-                new Text(vertex.getVertexValue().toString()));
-        }
+    @Override
+    public boolean nextVertex() {
+      return totalRecords > recordsRead;
     }
 
+    @Override
+    public BasicVertex<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable>
+    getCurrentVertex() throws IOException {
+      BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+      vertex = BspUtils.createVertex(configuration);
+
+      LongWritable vertexId = new LongWritable(
+          (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+      DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+      long destVertexId =
+          (vertexId.get() + 1) %
+          (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+      edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+      vertex.initialize(vertexId, vertexValue, edges, null);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+            ", vertexValue=" + vertex.getVertexValue() +
+            ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+      }
+      return vertex;
+    }
+  }
+
+  /**
+   * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
+   */
+  public static class SimplePageRankVertexInputFormat extends
+      GeneratedVertexInputFormat<LongWritable,
+        DoubleWritable, FloatWritable, DoubleWritable> {
+    @Override
+    public VertexReader<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+      throws IOException {
+      return new SimplePageRankVertexReader();
+    }
+  }
+
+  /**
+   * Simple VertexWriter that supports {@link SimplePageRankVertex}
+   */
+  public static class SimplePageRankVertexWriter extends
+      TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
     /**
-     * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+     * Constructor with line writer.
+     *
+     * @param lineRecordWriter Line writer that will do the writing.
      */
-    public static class SimplePageRankVertexOutputFormat extends
-            TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+    public SimplePageRankVertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
+    }
 
-        @Override
-        public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
-            createVertexWriter(TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            RecordWriter<Text, Text> recordWriter =
-                textOutputFormat.getRecordWriter(context);
-            return new SimplePageRankVertexWriter(recordWriter);
-        }
+    @Override
+    public void writeVertex(
+      BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      getRecordWriter().write(
+          new Text(vertex.getVertexId().toString()),
+          new Text(vertex.getVertexValue().toString()));
+    }
+  }
+
+  /**
+   * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+   */
+  public static class SimplePageRankVertexOutputFormat extends
+      TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+    @Override
+    public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+    createVertexWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      RecordWriter<Text, Text> recordWriter =
+          textOutputFormat.getRecordWriter(context);
+      return new SimplePageRankVertexWriter(recordWriter);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Thu Feb 16 22:12:31 2012
@@ -56,215 +56,230 @@ import java.util.Map;
  * Demonstrates the basic Pregel shortest paths implementation.
  */
 public class SimpleShortestPathsVertex extends
-        EdgeListVertex<LongWritable, DoubleWritable,
-        FloatWritable, DoubleWritable> implements Tool {
-    /** Configuration */
-    private Configuration conf;
-    /** Class logger */
-    private static final Logger LOG =
-        Logger.getLogger(SimpleShortestPathsVertex.class);
-    /** The shortest paths id */
-    public static String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
-    /** Default shortest paths id */
-    public static long SOURCE_ID_DEFAULT = 1;
+    EdgeListVertex<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> implements Tool {
+  /** The shortest paths id */
+  public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
+  /** Default shortest paths id */
+  public static final long SOURCE_ID_DEFAULT = 1;
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleShortestPathsVertex.class);
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Is this vertex the source id?
+   *
+   * @return True if the source id
+   */
+  private boolean isSource() {
+    return getVertexId().get() ==
+        getContext().getConfiguration().getLong(SOURCE_ID,
+            SOURCE_ID_DEFAULT);
+  }
+
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator) {
+    if (getSuperstep() == 0) {
+      setVertexValue(new DoubleWritable(Double.MAX_VALUE));
+    }
+    double minDist = isSource() ? 0d : Double.MAX_VALUE;
+    while (msgIterator.hasNext()) {
+      minDist = Math.min(minDist, msgIterator.next().get());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
+          " vertex value = " + getVertexValue());
+    }
+    if (minDist < getVertexValue().get()) {
+      setVertexValue(new DoubleWritable(minDist));
+      for (LongWritable targetVertexId : this) {
+        FloatWritable edgeValue = getEdgeValue(targetVertexId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Vertex " + getVertexId() + " sent to " +
+              targetVertexId + " = " +
+              (minDist + edgeValue.get()));
+        }
+        sendMsg(targetVertexId,
+            new DoubleWritable(minDist + edgeValue.get()));
+      }
+    }
+    voteToHalt();
+  }
+
+  /**
+   * VertexInputFormat that supports {@link SimpleShortestPathsVertex}
+   */
+  public static class SimpleShortestPathsVertexInputFormat extends
+      TextVertexInputFormat<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> {
+    @Override
+    public VertexReader<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+      throws IOException {
+      return new SimpleShortestPathsVertexReader(
+          textInputFormat.createRecordReader(split, context));
+    }
+  }
+
+  /**
+   * VertexReader that supports {@link SimpleShortestPathsVertex}.  In this
+   * case, the edge values are not used.  The files should be in the
+   * following JSON format:
+   * JSONArray(<vertex id>, <vertex value>,
+   *           JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
+   * Here is an example with vertex id 1, vertex value 4.3, and two edges.
+   * First edge has a destination vertex 2, edge value 2.1.
+   * Second edge has a destination vertex 3, edge value 0.7.
+   * [1,4.3,[[2,2.1],[3,0.7]]]
+   */
+  public static class SimpleShortestPathsVertexReader extends
+      TextVertexReader<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> {
 
     /**
-     * Is this vertex the source id?
+     * Constructor with the line record reader.
      *
-     * @return True if the source id
+     * @param lineRecordReader Will read from this line.
      */
-    private boolean isSource() {
-        return (getVertexId().get() ==
-            getContext().getConfiguration().getLong(SOURCE_ID,
-                                                    SOURCE_ID_DEFAULT));
+    public SimpleShortestPathsVertexReader(
+        RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
     }
 
     @Override
-    public void compute(Iterator<DoubleWritable> msgIterator) {
-        if (getSuperstep() == 0) {
-            setVertexValue(new DoubleWritable(Double.MAX_VALUE));
-        }
-        double minDist = isSource() ? 0d : Double.MAX_VALUE;
-        while (msgIterator.hasNext()) {
-            minDist = Math.min(minDist, msgIterator.next().get());
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
-                     " vertex value = " + getVertexValue());
-        }
-        if (minDist < getVertexValue().get()) {
-            setVertexValue(new DoubleWritable(minDist));
-            for (LongWritable targetVertexId : this) {
-                FloatWritable edgeValue = getEdgeValue(targetVertexId);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Vertex " + getVertexId() + " sent to " +
-                              targetVertexId + " = " +
-                              (minDist + edgeValue.get()));
-                }
-                sendMsg(targetVertexId,
-                        new DoubleWritable(minDist + edgeValue.get()));
-            }
-        }
-        voteToHalt();
-    }
-
-    /**
-     * VertexInputFormat that supports {@link SimpleShortestPathsVertex}
-     */
-    public static class SimpleShortestPathsVertexInputFormat extends
-            TextVertexInputFormat<LongWritable,
-                                  DoubleWritable,
-                                  FloatWritable,
-                                  DoubleWritable> {
-        @Override
-        public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-                createVertexReader(InputSplit split,
-                                   TaskAttemptContext context)
-                                   throws IOException {
-            return new SimpleShortestPathsVertexReader(
-                textInputFormat.createRecordReader(split, context));
-        }
-    }
-
-    /**
-     * VertexReader that supports {@link SimpleShortestPathsVertex}.  In this
-     * case, the edge values are not used.  The files should be in the
-     * following JSON format:
-     * JSONArray(<vertex id>, <vertex value>,
-     *           JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
-     * Here is an example with vertex id 1, vertex value 4.3, and two edges.
-     * First edge has a destination vertex 2, edge value 2.1.
-     * Second edge has a destination vertex 3, edge value 0.7.
-     * [1,4.3,[[2,2.1],[3,0.7]]]
-     */
-    public static class SimpleShortestPathsVertexReader extends
-            TextVertexReader<LongWritable,
-                DoubleWritable, FloatWritable, DoubleWritable> {
-
-        public SimpleShortestPathsVertexReader(
-                RecordReader<LongWritable, Text> lineRecordReader) {
-            super(lineRecordReader);
-        }
-
-        @Override
-        public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
-                           DoubleWritable> getCurrentVertex()
-            throws IOException, InterruptedException {
-          BasicVertex<LongWritable, DoubleWritable, FloatWritable,
-              DoubleWritable> vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
-                  DoubleWritable>createVertex(getContext().getConfiguration());
-
-            Text line = getRecordReader().getCurrentValue();
-            try {
-                JSONArray jsonVertex = new JSONArray(line.toString());
-                LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
-                DoubleWritable vertexValue = new DoubleWritable(jsonVertex.getDouble(1));
-                Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
-                JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
-                for (int i = 0; i < jsonEdgeArray.length(); ++i) {
-                    JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
-                    edges.put(new LongWritable(jsonEdge.getLong(0)),
-                            new FloatWritable((float) jsonEdge.getDouble(1)));
-                }
-                vertex.initialize(vertexId, vertexValue, edges, null);
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "next: Couldn't get vertex from line " + line, e);
-            }
-            return vertex;
-        }
-
-        @Override
-        public boolean nextVertex() throws IOException, InterruptedException {
-            return getRecordReader().nextKeyValue();
-        }
-    }
-
-    /**
-     * VertexOutputFormat that supports {@link SimpleShortestPathsVertex}
-     */
-    public static class SimpleShortestPathsVertexOutputFormat extends
-            TextVertexOutputFormat<LongWritable, DoubleWritable,
-            FloatWritable> {
-
-        @Override
-        public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
-                createVertexWriter(TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            RecordWriter<Text, Text> recordWriter =
-                textOutputFormat.getRecordWriter(context);
-            return new SimpleShortestPathsVertexWriter(recordWriter);
-        }
-    }
-
-    /**
-     * VertexWriter that supports {@link SimpleShortestPathsVertex}
-     */
-    public static class SimpleShortestPathsVertexWriter extends
-            TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
-        public SimpleShortestPathsVertexWriter(
-                RecordWriter<Text, Text> lineRecordWriter) {
-            super(lineRecordWriter);
-        }
-
-        @Override
-        public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
-                                FloatWritable, ?> vertex)
-                throws IOException, InterruptedException {
-            JSONArray jsonVertex = new JSONArray();
-            try {
-                jsonVertex.put(vertex.getVertexId().get());
-                jsonVertex.put(vertex.getVertexValue().get());
-                JSONArray jsonEdgeArray = new JSONArray();
-                for (LongWritable targetVertexId : vertex) {
-                    JSONArray jsonEdge = new JSONArray();
-                    jsonEdge.put(targetVertexId.get());
-                    jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
-                    jsonEdgeArray.put(jsonEdge);
-                }
-                jsonVertex.put(jsonEdgeArray);
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "writeVertex: Couldn't write vertex " + vertex);
-            }
-            getRecordWriter().write(new Text(jsonVertex.toString()), null);
-        }
+    public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> getCurrentVertex()
+      throws IOException, InterruptedException {
+      BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+      DoubleWritable> vertex =
+        BspUtils.<LongWritable, DoubleWritable, FloatWritable,
+          DoubleWritable>createVertex(getContext().getConfiguration());
+
+      Text line = getRecordReader().getCurrentValue();
+      try {
+        JSONArray jsonVertex = new JSONArray(line.toString());
+        LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
+        DoubleWritable vertexValue =
+            new DoubleWritable(jsonVertex.getDouble(1));
+        Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+        JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+        for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+          JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+          edges.put(new LongWritable(jsonEdge.getLong(0)),
+              new FloatWritable((float) jsonEdge.getDouble(1)));
+        }
+        vertex.initialize(vertexId, vertexValue, edges, null);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+            "next: Couldn't get vertex from line " + line, e);
+      }
+      return vertex;
     }
 
     @Override
-    public Configuration getConf() {
-        return conf;
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
     }
+  }
 
+  /**
+   * VertexOutputFormat that supports {@link SimpleShortestPathsVertex}
+   */
+  public static class SimpleShortestPathsVertexOutputFormat extends
+      TextVertexOutputFormat<LongWritable, DoubleWritable,
+      FloatWritable> {
     @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
+    public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+    createVertexWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      RecordWriter<Text, Text> recordWriter =
+          textOutputFormat.getRecordWriter(context);
+      return new SimpleShortestPathsVertexWriter(recordWriter);
+    }
+  }
+
+  /**
+   * VertexWriter that supports {@link SimpleShortestPathsVertex}
+   */
+  public static class SimpleShortestPathsVertexWriter extends
+      TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
+    /**
+     * Vertex writer with the internal line writer.
+     *
+     * @param lineRecordWriter Wil actually be written to.
+     */
+    public SimpleShortestPathsVertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
     }
 
     @Override
-    public int run(String[] argArray) throws Exception {
-        Preconditions.checkArgument(argArray.length == 4,
-            "run: Must have 4 arguments <input path> <output path> " +
-            "<source vertex id> <# of workers>");
-
-        GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-        job.setVertexClass(getClass());
-        job.setVertexInputFormatClass(
-            SimpleShortestPathsVertexInputFormat.class);
-        job.setVertexOutputFormatClass(
-            SimpleShortestPathsVertexOutputFormat.class);
-        FileInputFormat.addInputPath(job, new Path(argArray[0]));
-        FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
-        job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
-                                       Long.parseLong(argArray[2]));
-        job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
-                                   Integer.parseInt(argArray[3]),
-                                   100.0f);
-
-        return job.run(true) ? 0 : -1;
-    }
-
-    public static void main(String[] args) throws Exception {
-        System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
-    }
+    public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
+      FloatWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      JSONArray jsonVertex = new JSONArray();
+      try {
+        jsonVertex.put(vertex.getVertexId().get());
+        jsonVertex.put(vertex.getVertexValue().get());
+        JSONArray jsonEdgeArray = new JSONArray();
+        for (LongWritable targetVertexId : vertex) {
+          JSONArray jsonEdge = new JSONArray();
+          jsonEdge.put(targetVertexId.get());
+          jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
+          jsonEdgeArray.put(jsonEdge);
+        }
+        jsonVertex.put(jsonEdgeArray);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+            "writeVertex: Couldn't write vertex " + vertex);
+      }
+      getRecordWriter().write(new Text(jsonVertex.toString()), null);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public int run(String[] argArray) throws Exception {
+    Preconditions.checkArgument(argArray.length == 4,
+        "run: Must have 4 arguments <input path> <output path> " +
+        "<source vertex id> <# of workers>");
+
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.setVertexClass(getClass());
+    job.setVertexInputFormatClass(
+        SimpleShortestPathsVertexInputFormat.class);
+    job.setVertexOutputFormatClass(
+        SimpleShortestPathsVertexOutputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(argArray[0]));
+    FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
+    job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
+        Long.parseLong(argArray[2]));
+    job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
+        Integer.parseInt(argArray[3]),
+        100.0f);
+
+    return job.run(true) ? 0 : -1;
+  }
+
+  /**
+   * Can be used for command line execution.
+   *
+   * @param args Command line arguments.
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java Thu Feb 16 22:12:31 2012
@@ -31,18 +31,18 @@ import org.apache.giraph.graph.VertexCom
  * Test whether combiner is called by summing up the messages.
  */
 public class SimpleSumCombiner
-        extends VertexCombiner<LongWritable, IntWritable> {
+    extends VertexCombiner<LongWritable, IntWritable> {
 
-    @Override
-    public Iterable<IntWritable> combine(LongWritable vertexIndex,
-            Iterable<IntWritable> messages) throws IOException {
-        int sum = 0;
-        for (IntWritable msg : messages) {
-            sum += msg.get();
-        }
-        List<IntWritable> value = new ArrayList<IntWritable>();
-        value.add(new IntWritable(sum));
-        
-        return value;
+  @Override
+  public Iterable<IntWritable> combine(LongWritable vertexIndex,
+      Iterable<IntWritable> messages) throws IOException {
+    int sum = 0;
+    for (IntWritable msg : messages) {
+      sum += msg.get();
     }
+    List<IntWritable> value = new ArrayList<IntWritable>();
+    value.add(new IntWritable(sum));
+
+    return value;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Feb 16 22:12:31 2012
@@ -44,115 +44,119 @@ import java.util.Map;
  * finishes.
  */
 public class SimpleSuperstepVertex extends
-        EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
-    @Override
-    public void compute(Iterator<IntWritable> msgIterator) {
-        if (getSuperstep() > 3) {
-            voteToHalt();
-        }
+    EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+  @Override
+  public void compute(Iterator<IntWritable> msgIterator) {
+    if (getSuperstep() > 3) {
+      voteToHalt();
     }
+  }
 
+  /**
+   * Simple VertexReader that supports {@link SimpleSuperstepVertex}
+   */
+  public static class SimpleSuperstepVertexReader extends
+      GeneratedVertexReader<LongWritable, IntWritable,
+        FloatWritable, IntWritable> {
+    /** Class logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimpleSuperstepVertexReader.class);
     /**
-     * Simple VertexReader that supports {@link SimpleSuperstepVertex}
+     * Constructor.
      */
-    public static class SimpleSuperstepVertexReader extends
-            GeneratedVertexReader<LongWritable, IntWritable,
-            FloatWritable, IntWritable> {
-        /** Class logger */
-        private static final Logger LOG =
-            Logger.getLogger(SimpleSuperstepVertexReader.class);
-        @Override
-        public boolean nextVertex() throws IOException, InterruptedException {
-            return totalRecords > recordsRead;
-        }
-
-        public SimpleSuperstepVertexReader() {
-            super();
-        }
-
-        @Override
-        public BasicVertex<LongWritable, IntWritable, FloatWritable,
-                IntWritable> getCurrentVertex()
-                throws IOException, InterruptedException {
-            BasicVertex<LongWritable, IntWritable,
-                        FloatWritable, IntWritable> vertex =
-                BspUtils.<LongWritable, IntWritable,
-                          FloatWritable, IntWritable>createVertex(
-                    configuration);
-            long tmpId = reverseIdOrder ?
-                ((inputSplit.getSplitIndex() + 1) * totalRecords) -
-                    recordsRead - 1 :
-                (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
-            LongWritable vertexId = new LongWritable(tmpId);
-            IntWritable vertexValue =
-                new IntWritable((int) (vertexId.get() * 10));
-            Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
-            long destVertexId =
-                (vertexId.get() + 1) %
-                    (inputSplit.getNumSplits() * totalRecords);
-            float edgeValue = vertexId.get() * 100f;
-            edgeMap.put(new LongWritable(destVertexId),
-                        new FloatWritable(edgeValue));
-            vertex.initialize(vertexId, vertexValue, edgeMap, null);
-            ++recordsRead;
-            if (LOG.isInfoEnabled()) {
-                LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
-                         ", vertexValue=" + vertex.getVertexValue() +
-                         ", destinationId=" + destVertexId +
-                         ", edgeValue=" + edgeValue);
-            }
-            return vertex;
-        }
+    public SimpleSuperstepVertexReader() {
+      super();
     }
 
-    /**
-     * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
-     */
-    public static class SimpleSuperstepVertexInputFormat extends
-            GeneratedVertexInputFormat<LongWritable,
-            IntWritable, FloatWritable, IntWritable> {
-        @Override
-        public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
-                createVertexReader(InputSplit split,
-                                   TaskAttemptContext context)
-                                   throws IOException {
-            return new SimpleSuperstepVertexReader();
-        }
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return totalRecords > recordsRead;
     }
 
-    /**
-     * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
-     */
-    public static class SimpleSuperstepVertexWriter extends
-            TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
-        public SimpleSuperstepVertexWriter(
-                RecordWriter<Text, Text> lineRecordWriter) {
-            super(lineRecordWriter);
-        }
-
-        @Override
-        public void writeVertex(
-                BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
-                throws IOException, InterruptedException {
-            getRecordWriter().write(
-                new Text(vertex.getVertexId().toString()),
-                new Text(vertex.getVertexValue().toString()));
-        }
+    @Override
+    public BasicVertex<LongWritable, IntWritable, FloatWritable,
+    IntWritable> getCurrentVertex()
+      throws IOException, InterruptedException {
+      BasicVertex<LongWritable, IntWritable,
+      FloatWritable, IntWritable> vertex =
+        BspUtils.<LongWritable, IntWritable, FloatWritable,
+        IntWritable>createVertex(configuration);
+      long tmpId = reverseIdOrder ?
+          ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+          recordsRead - 1 :
+            (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+      LongWritable vertexId = new LongWritable(tmpId);
+      IntWritable vertexValue =
+          new IntWritable((int) (vertexId.get() * 10));
+      Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
+      long destVertexId =
+          (vertexId.get() + 1) %
+          (inputSplit.getNumSplits() * totalRecords);
+      float edgeValue = vertexId.get() * 100f;
+      edgeMap.put(new LongWritable(destVertexId),
+          new FloatWritable(edgeValue));
+      vertex.initialize(vertexId, vertexValue, edgeMap, null);
+      ++recordsRead;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+            ", vertexValue=" + vertex.getVertexValue() +
+            ", destinationId=" + destVertexId +
+            ", edgeValue=" + edgeValue);
+      }
+      return vertex;
     }
+  }
 
+  /**
+   * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
+   */
+  public static class SimpleSuperstepVertexInputFormat extends
+      GeneratedVertexInputFormat<LongWritable,
+      IntWritable, FloatWritable, IntWritable> {
+    @Override
+    public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
+    createVertexReader(InputSplit split, TaskAttemptContext context)
+      throws IOException {
+      return new SimpleSuperstepVertexReader();
+    }
+  }
+
+  /**
+   * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
+   */
+  public static class SimpleSuperstepVertexWriter extends
+      TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
     /**
-     * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
+     * Constructor with the line record writer.
+     *
+     * @param lineRecordWriter Writer to write to.
      */
-    public static class SimpleSuperstepVertexOutputFormat extends
-            TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+    public SimpleSuperstepVertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
+    }
 
-        @Override
-        public VertexWriter<LongWritable, IntWritable, FloatWritable>
-            createVertexWriter(TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            RecordWriter<Text, Text> recordWriter =
-                textOutputFormat.getRecordWriter(context);
-            return new SimpleSuperstepVertexWriter(recordWriter);
-        }
+    @Override
+    public void writeVertex(BasicVertex<LongWritable, IntWritable,
+        FloatWritable, ?> vertex) throws IOException, InterruptedException {
+      getRecordWriter().write(
+          new Text(vertex.getVertexId().toString()),
+          new Text(vertex.getVertexValue().toString()));
+    }
+  }
+
+  /**
+   * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
+   */
+  public static class SimpleSuperstepVertexOutputFormat extends
+      TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+    @Override
+    public VertexWriter<LongWritable, IntWritable, FloatWritable>
+    createVertexWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      RecordWriter<Text, Text> recordWriter =
+          textOutputFormat.getRecordWriter(context);
+      return new SimpleSuperstepVertexWriter(recordWriter);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -33,39 +33,38 @@ import org.apache.giraph.lib.TextVertexO
  * Simple text based vertex output format example.
  */
 public class SimpleTextVertexOutputFormat extends
-         TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+    TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+  /**
+   * Simple text based vertex writer
+   */
+  private static class SimpleTextVertexWriter
+    extends TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
     /**
-     * Simple text based vertex writer
+     * Initialize with the LineRecordWriter.
+     *
+     * @param lineRecordWriter Line record writer from TextOutputFormat
      */
-    private static class SimpleTextVertexWriter
-            extends TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
-
-        /**
-         * Initialize with the LineRecordWriter.
-         *
-         * @param lineRecordWriter Line record writer from TextOutputFormat
-         */
-        public SimpleTextVertexWriter(
-                RecordWriter<Text, Text> lineRecordWriter) {
-            super(lineRecordWriter);
-        }
-
-        @Override
-        public void writeVertex(
-                BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
-                throws IOException, InterruptedException {
-            getRecordWriter().write(
-                new Text(vertex.getVertexId().toString()),
-                new Text(vertex.getVertexValue().toString()));
-        }
+    public SimpleTextVertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
     }
 
     @Override
-    public VertexWriter<LongWritable, IntWritable, FloatWritable>
-        createVertexWriter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        RecordWriter<Text, Text> recordWriter =
-            textOutputFormat.getRecordWriter(context);
-        return new SimpleTextVertexWriter(recordWriter);
+    public void writeVertex(
+      BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      getRecordWriter().write(
+          new Text(vertex.getVertexId().toString()),
+          new Text(vertex.getVertexValue().toString()));
     }
+  }
+
+  @Override
+  public VertexWriter<LongWritable, IntWritable, FloatWritable>
+  createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    RecordWriter<Text, Text> recordWriter =
+        textOutputFormat.getRecordWriter(context);
+    return new SimpleTextVertexWriter(recordWriter);
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Thu Feb 16 22:12:31 2012
@@ -22,7 +22,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.
+  SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerContext;
@@ -43,123 +44,139 @@ import org.apache.hadoop.util.ToolRunner
  * computation.
  */
 public class SimpleVertexWithWorkerContext extends
-        EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
-        implements Tool {
+    EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
+    implements Tool {
+  /** Directory name of where to write. */
+  public static final String OUTPUTDIR = "svwwc.outputdir";
+  /** Halting condition for the number of supersteps */
+  private static final int TESTLENGTH = 30;
+
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator)
+    throws IOException {
+
+    long superstep = getSuperstep();
+
+    if (superstep < TESTLENGTH) {
+      EmitterWorkerContext emitter =
+          (EmitterWorkerContext) getWorkerContext();
+      emitter.emit("vertexId=" + getVertexId() +
+          " superstep=" + superstep + "\n");
+    } else {
+      voteToHalt();
+    }
+  }
 
-    public static final String OUTPUTDIR = "svwwc.outputdir";
-    private static final int TESTLENGTH = 30;
+  /**
+   * Example worker context to emit data as part of a superstep.
+   */
+  @SuppressWarnings("rawtypes")
+  public static class EmitterWorkerContext extends WorkerContext {
+    /** File name prefix */
+    private static final String FILENAME = "emitter_";
+    /** Output stream to dump the strings. */
+    private DataOutputStream out;
 
     @Override
-    public void compute(Iterator<DoubleWritable> msgIterator)
-            throws IOException {
-
-        long superstep = getSuperstep();
-
-        if (superstep < TESTLENGTH) {
-            EmitterWorkerContext emitter =
-                    (EmitterWorkerContext) getWorkerContext();
-            emitter.emit("vertexId=" + getVertexId() +
-                         " superstep=" + superstep + "\n");
-        } else {
-            voteToHalt();
-        }
+    public void preApplication() {
+      Context context = getContext();
+      FileSystem fs;
+
+      try {
+        fs = FileSystem.get(context.getConfiguration());
+
+        String p = context.getConfiguration()
+            .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
+        if (p == null) {
+          throw new IllegalArgumentException(
+              SimpleVertexWithWorkerContext.OUTPUTDIR +
+              " undefined!");
+        }
+
+        Path path = new Path(p);
+        if (!fs.exists(path)) {
+          throw new IllegalArgumentException(path +
+              " doesn't exist");
+        }
+
+        Path outF = new Path(path, FILENAME +
+            context.getTaskAttemptID());
+        if (fs.exists(outF)) {
+          throw new IllegalArgumentException(outF +
+              " aready exists");
+        }
+
+        out = fs.create(outF);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "can't initialize WorkerContext", e);
+      }
     }
 
-    @SuppressWarnings("rawtypes")
-	public static class EmitterWorkerContext extends WorkerContext {
-
-        private static final String FILENAME = "emitter_";
-        private DataOutputStream out;
-
-        @Override
-        public void preApplication() {
-            Context context = getContext();
-            FileSystem fs;
-
-            try {
-                fs = FileSystem.get(context.getConfiguration());
-
-                String p = context.getConfiguration()
-                    .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
-                if (p == null) {
-                    throw new IllegalArgumentException(
-                        SimpleVertexWithWorkerContext.OUTPUTDIR +
-                        " undefined!");
-                }
-
-                Path path = new Path(p);
-                if (!fs.exists(path)) {
-                    throw new IllegalArgumentException(path +
-                            " doesn't exist");
-                }
-
-                Path outF = new Path(path, FILENAME +
-                        context.getTaskAttemptID());
-                if (fs.exists(outF)) {
-                    throw new IllegalArgumentException(outF +
-                            " aready exists");
-                }
-
-                out = fs.create(outF);
-            } catch (IOException e) {
-                throw new RuntimeException(
-                        "can't initialize WorkerContext", e);
-            }
-        }
-
-        @Override
-        public void postApplication() {
-            if (out != null) {
-                try {
-                    out.flush();
-                    out.close();
-                } catch (IOException e) {
-                    throw new RuntimeException(
-                            "can't finalize WorkerContext", e);
-                }
-                out = null;
-            }
+    @Override
+    public void postApplication() {
+      if (out != null) {
+        try {
+          out.flush();
+          out.close();
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "can't finalize WorkerContext", e);
         }
+        out = null;
+      }
+    }
 
-        @Override
-        public void preSuperstep() { }
+    @Override
+    public void preSuperstep() { }
 
-        @Override
-        public void postSuperstep() { }
+    @Override
+    public void postSuperstep() { }
 
-        public void emit(String s) {
-            try {
-                out.writeUTF(s);
-            } catch (IOException e) {
-                throw new RuntimeException("can't emit", e);
-            }
-        }
+    /**
+     * Write this string to the output stream.
+     *
+     * @param s String to dump.
+     */
+    public void emit(String s) {
+      try {
+        out.writeUTF(s);
+      } catch (IOException e) {
+        throw new RuntimeException("can't emit", e);
+      }
     }
+  }
 
-    @Override
-    public int run(String[] args) throws Exception {
-        if (args.length != 2) {
-            throw new IllegalArgumentException(
-                "run: Must have 2 arguments <output path> <# of workers>");
-        }
-        GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-        job.setVertexClass(getClass());
-        job.setVertexInputFormatClass(
-            SimpleSuperstepVertexInputFormat.class);
-        job.setWorkerContextClass(EmitterWorkerContext.class);
-        Configuration conf = job.getConfiguration();
-        conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
-        job.setWorkerConfiguration(Integer.parseInt(args[1]),
-                                   Integer.parseInt(args[1]),
-                                   100.0f);
-        if (job.run(true) == true) {
-            return 0;
-        } else {
-            return -1;
-        }
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      throw new IllegalArgumentException(
+          "run: Must have 2 arguments <output path> <# of workers>");
     }
-
-    public static void main(String[] args) throws Exception {
-        System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.setVertexClass(getClass());
+    job.setVertexInputFormatClass(
+        SimpleSuperstepVertexInputFormat.class);
+    job.setWorkerContextClass(EmitterWorkerContext.class);
+    Configuration conf = job.getConfiguration();
+    conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+    job.setWorkerConfiguration(Integer.parseInt(args[1]),
+        Integer.parseInt(args[1]),
+        100.0f);
+    if (job.run(true)) {
+      return 0;
+    } else {
+      return -1;
     }
-}
\ No newline at end of file
+  }
+
+  /**
+   * Executable from the command line.
+   *
+   * @param args Command line arguments.
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+  }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java Thu Feb 16 22:12:31 2012
@@ -24,30 +24,38 @@ import org.apache.giraph.graph.Aggregato
 
 /**
  * Aggregator for summing up values.
- *
  */
 public class SumAggregator implements Aggregator<DoubleWritable> {
-
+  /** Aggregated sum */
   private double sum = 0;
 
+  /**
+   * Aggregate a double.
+   *
+   * @param value Value to aggregate.
+   */
   public void aggregate(double value) {
-      sum += value;
+    sum += value;
   }
 
+  @Override
   public void aggregate(DoubleWritable value) {
-      sum += value.get();
+    sum += value.get();
   }
 
+  @Override
   public void setAggregatedValue(DoubleWritable value) {
-      sum = value.get();
+    sum = value.get();
   }
 
+  @Override
   public DoubleWritable getAggregatedValue() {
-      return new DoubleWritable(sum);
+    return new DoubleWritable(sum);
   }
 
+  @Override
   public DoubleWritable createAggregatedValue() {
-      return new DoubleWritable();
+    return new DoubleWritable();
   }
 
 }