You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/26 18:11:13 UTC

svn commit: r1342922 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/org/apache/hama/examples/ examples/sr...

Author: tjungblut
Date: Sat May 26 16:11:11 2012
New Revision: 1342922

URL: http://svn.apache.org/viewvc?rev=1342922&view=rev
Log:
[HAMA-580]: Improve input of graph module

Added:
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Removed:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Sat May 26 16:11:11 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.Text;
 
 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 
+  @Override
   public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, BSPJob job)
       throws IOException {
     return new LineRecordReader(job.getConf(), (FileSplit) split);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Sat May 26 16:11:11 2012
@@ -20,8 +20,6 @@
 package org.apache.hama.examples;
 
 import org.apache.hadoop.util.ProgramDriver;
-import org.apache.hama.examples.util.PagerankTextToSeq;
-import org.apache.hama.examples.util.SSSPTextToSeq;
 
 public class ExampleDriver {
 
@@ -29,15 +27,11 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
-      pgd.addClass("sssp-text2seq", SSSPTextToSeq.class,
-          "Generates SSSP input from textfile");
       pgd.addClass("sssp", SSSP.class, "Single Shortest Path");
       pgd.addClass("mdstsearch", MindistSearch.class,
           "Mindist search / Connected Components");
       pgd.addClass("cmb", CombineExample.class, "Combine");
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
-      pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class,
-          "Generates Pagerank and mindist search input from textfile");
       pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.driver(args);
     } catch (Throwable e) {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Sat May 26 16:11:11 2012
@@ -22,16 +22,17 @@ import java.util.Iterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.graph.VertexInputReader;
 
 public class InlinkCount extends Vertex<Text, IntWritable, NullWritable> {
 
@@ -48,6 +49,34 @@ public class InlinkCount extends Vertex<
     }
   }
 
+  public static class InlinkCountTextReader extends
+      VertexInputReader<LongWritable, Text, Text, IntWritable, NullWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+     * E.G:<br/>
+     * 1\t2\t3\t4<br/>
+     * 2\t3\t1<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, IntWritable, NullWritable> vertex) {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
+    }
+
+  }
+
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     // Graph job configuration
@@ -64,13 +93,14 @@ public class InlinkCount extends Vertex<
     }
 
     inlinkJob.setVertexClass(InlinkCount.class);
-    inlinkJob.setInputFormat(SequenceFileInputFormat.class);
-    inlinkJob.setInputKeyClass(VertexWritable.class);
-    inlinkJob.setInputValueClass(VertexArrayWritable.class);
+    inlinkJob.setInputFormat(TextInputFormat.class);
+    inlinkJob.setInputKeyClass(LongWritable.class);
+    inlinkJob.setInputValueClass(Text.class);
 
     inlinkJob.setVertexIDClass(Text.class);
     inlinkJob.setVertexValueClass(IntWritable.class);
     inlinkJob.setEdgeValueClass(NullWritable.class);
+    inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class);
 
     inlinkJob.setPartitioner(HashPartitioner.class);
     inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Sat May 26 16:11:11 2012
@@ -22,16 +22,18 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
 
 /**
  * Finding the mindist vertex in a connected component.
@@ -52,7 +54,7 @@ public class MindistSearch {
         // neighbourhood.
         if (currentComponent == null) {
           setValue(new Text(getVertexID()));
-          for (Edge<Text, NullWritable> e : edges) {
+          for (Edge<Text, NullWritable> e : getEdges()) {
             Text id = getVertexID();
             if (id.compareTo(e.getDestinationVertexID()) > 0) {
               setValue(e.getDestinationVertexID());
@@ -91,6 +93,34 @@ public class MindistSearch {
 
   }
 
+  public static class MindistSearchCountReader extends
+      VertexInputReader<LongWritable, Text, Text, Text, NullWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+     * E.G:<br/>
+     * 1\t2\t3\t4<br/>
+     * 2\t3\t1<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, Text, NullWritable> vertex) {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
+    }
+
+  }
+
   private static void printUsage() {
     System.out
         .println("Usage: <input> <output> [maximum iterations (default 30)] [tasks]");
@@ -103,35 +133,38 @@ public class MindistSearch {
       printUsage();
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
-    GraphJob connectedComponentsJob = new GraphJob(conf,
+    GraphJob job = new GraphJob(conf,
         MindistSearchVertex.class);
-    connectedComponentsJob.setJobName("Mindist Search");
+    job.setJobName("Mindist Search");
 
-    connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
-    connectedComponentsJob.setInputPath(new Path(args[0]));
-    connectedComponentsJob.setOutputPath(new Path(args[1]));
+    job.setVertexClass(MindistSearchVertex.class);
+    job.setInputPath(new Path(args[0]));
+    job.setOutputPath(new Path(args[1]));
     // set the min text combiner here
-    connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
+    job.setCombinerClass(MinTextCombiner.class);
 
     // set the defaults
-    connectedComponentsJob.setMaxIteration(30);
+    job.setMaxIteration(30);
     if (args.length == 4)
-      connectedComponentsJob.setNumBspTask(Integer.parseInt(args[3]));
+      job.setNumBspTask(Integer.parseInt(args[3]));
     if (args.length >= 3)
-      connectedComponentsJob.setMaxIteration(Integer.parseInt(args[2]));
+      job.setMaxIteration(Integer.parseInt(args[2]));
 
-    connectedComponentsJob.setVertexIDClass(Text.class);
-    connectedComponentsJob.setVertexValueClass(Text.class);
-    connectedComponentsJob.setEdgeValueClass(NullWritable.class);
-
-    connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
-    connectedComponentsJob.setPartitioner(HashPartitioner.class);
-    connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
-    connectedComponentsJob.setOutputKeyClass(Text.class);
-    connectedComponentsJob.setOutputValueClass(Text.class);
+    job.setVertexIDClass(Text.class);
+    job.setVertexValueClass(Text.class);
+    job.setEdgeValueClass(NullWritable.class);
+
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(Text.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setVertexInputReaderClass(MindistSearchCountReader.class);
+    job.setPartitioner(HashPartitioner.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
 
     long startTime = System.currentTimeMillis();
-    if (connectedComponentsJob.waitForCompletion(true)) {
+    if (job.waitForCompletion(true)) {
       System.out.println("Job Finished in "
           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sat May 26 16:11:11 2012
@@ -23,15 +23,18 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
 
 public class PageRank {
 
@@ -53,7 +56,7 @@ public class PageRank {
       if (val != null) {
         MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
       }
-      numEdges = this.getOutEdges().size();
+      numEdges = this.getEdges().size();
     }
 
     @Override
@@ -86,6 +89,34 @@ public class PageRank {
     }
   }
 
+  public static class PagerankTextReader extends
+      VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+     * E.G:<br/>
+     * 1\t2\t3\t4<br/>
+     * 2\t3\t1<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, DoubleWritable, NullWritable> vertex) {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
+    }
+
+  }
+
   private static void printUsage() {
     System.out
         .println("Usage: <input> <output> [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]");
@@ -127,7 +158,10 @@ public class PageRank {
     pageJob.setVertexValueClass(DoubleWritable.class);
     pageJob.setEdgeValueClass(NullWritable.class);
 
-    pageJob.setInputFormat(SequenceFileInputFormat.class);
+    pageJob.setInputKeyClass(LongWritable.class);
+    pageJob.setInputValueClass(Text.class);
+    pageJob.setInputFormat(TextInputFormat.class);
+    pageJob.setVertexInputReaderClass(PagerankTextReader.class);
     pageJob.setPartitioner(HashPartitioner.class);
     pageJob.setOutputFormat(SequenceFileOutputFormat.class);
     pageJob.setOutputKeyClass(Text.class);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Sat May 26 16:11:11 2012
@@ -22,17 +22,17 @@ import java.util.Iterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.graph.VertexInputReader;
 
 public class SSSP {
 
@@ -63,7 +63,7 @@ public class SSSP {
 
       if (minDist < this.getValue().get()) {
         this.setValue(new IntWritable(minDist));
-        for (Edge<Text, IntWritable> e : this.getOutEdges()) {
+        for (Edge<Text, IntWritable> e : this.getEdges()) {
           sendMessage(e, new IntWritable(minDist + e.getValue().get()));
         }
       }
@@ -87,6 +87,35 @@ public class SSSP {
     }
   }
 
+  public static class SSSPTextReader extends
+      VertexInputReader<LongWritable, Text, Text, IntWritable, IntWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_ID:VERTEX_VALUE pairs)<br/>
+     * E.G:<br/>
+     * 1\t2:25\t3:32\t4:21<br/>
+     * 2\t3:222\t1:922<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, IntWritable, IntWritable> vertex) {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          String[] split2 = split[i].split(":");
+          vertex.addEdge(new Edge<Text, IntWritable>(new Text(split2[0]),
+              new IntWritable(Integer.parseInt(split2[1]))));
+        }
+      }
+      return true;
+    }
+
+  }
+
   private static void printUsage() {
     System.out.println("Usage: <startnode> <input> <output> [tasks]");
     System.exit(-1);
@@ -113,12 +142,13 @@ public class SSSP {
 
     ssspJob.setVertexClass(ShortestPathVertex.class);
     ssspJob.setCombinerClass(MinIntCombiner.class);
-    ssspJob.setInputFormat(SequenceFileInputFormat.class);
-    ssspJob.setInputKeyClass(VertexWritable.class);
-    ssspJob.setInputValueClass(VertexArrayWritable.class);
+    ssspJob.setInputFormat(TextInputFormat.class);
+    ssspJob.setInputKeyClass(LongWritable.class);
+    ssspJob.setInputValueClass(Text.class);
 
     ssspJob.setPartitioner(HashPartitioner.class);
     ssspJob.setOutputFormat(SequenceFileOutputFormat.class);
+    ssspJob.setVertexInputReaderClass(SSSPTextReader.class);
     ssspJob.setOutputKeyClass(Text.class);
     ssspJob.setOutputValueClass(IntWritable.class);
     // Iterate until all the nodes have been reached.

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sat May 26 16:11:11 2012
@@ -21,10 +21,7 @@ import java.io.BufferedWriter;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -32,52 +29,19 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
-import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.examples.MindistSearch.MinTextCombiner;
-import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
-import org.apache.hama.examples.util.PagerankTextToSeq;
-import org.apache.hama.graph.GraphJob;
-import org.apache.hama.graph.GraphJobRunner;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
 
 public class MindistSearchTest extends TestCase {
 
-  private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
-  // mapping of our index of the vertex to the resulting component id
-  private static final String[] resultList = new String[] { "0", "1", "2", "2",
-      "1", "2", "2", "1", "2", "0" };
-  static {
-    String[] pages = new String[] { "0", "1", "2", "3", "4", "5", "6", "7",
-        "8", "9" };
-    String[] lineArray = new String[] { "0", "1;4;7", "2;3;8", "3;5", "4;1",
-        "5;6", "6", "7", "8;3", "9;0" };
-
-    for (int i = 0; i < lineArray.length; i++) {
-      String[] adjacencyStringArray = lineArray[i].split(";");
-      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
-      String name = pages[vertexId];
-      @SuppressWarnings("unchecked")
-      VertexWritable<Text, IntWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
-      for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable<Text, IntWritable>(new IntWritable(0),
-            new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
-            Text.class, IntWritable.class);
-      }
-      VertexArrayWritable wr = new VertexArrayWritable();
-      wr.set(arr);
-      tmp.put(
-          new VertexWritable<Text, IntWritable>(new Text(name), Text.class), wr);
-    }
-  }
+  String[] resultList = new String[] { "0", "1", "2", "2", "1", "2", "2", "1",
+      "2", "0" };
+  String[] input = new String[] { "0", "1\t4\t7", "2\t3\t8", "3\t5", "4\t1", "5\t6",
+      "6", "7", "8\t3", "9\t0" };
+
   private static String INPUT = "/tmp/pagerank-tmp.seq";
   private static String TEXT_INPUT = "/tmp/pagerank.txt";
   private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
@@ -92,7 +56,7 @@ public class MindistSearchTest extends T
   }
 
   public void testMindistSearch() throws Exception {
-    generateSeqTestData(tmp);
+    generateTestData();
     try {
       MindistSearch.main(new String[] { INPUT, OUTPUT });
 
@@ -131,88 +95,24 @@ public class MindistSearchTest extends T
     assertEquals(resultList.length, itemsRead);
   }
 
-  private void generateSeqTestData(
-      Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp)
-      throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
-        INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
-        .entrySet()) {
-      writer.append(e.getKey(), e.getValue());
-    }
-    writer.close();
-  }
-
-  public void testPageRankUtil() throws Exception {
-    generateTestTextData();
-    // <input path> <output path>
-    PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+  private void generateTestData() {
+    BufferedWriter bw = null;
     try {
-      MindistSearch.main(new String[] { TEXT_OUTPUT, OUTPUT });
-
-      verifyResult();
-    } finally {
-      deleteTempDirs();
-    }
-  }
-
-  public void testRepairFunctionality() throws Exception {
-    // make a copy to be safe with parallel test executions
-    final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(
-        tmp);
-    // removing 7 should resulting in creating it and getting the same result as
-    // usual
-    map.remove(new VertexWritable<Text, IntWritable>("7"));
-    generateSeqTestData(map);
-    try {
-      HamaConfiguration conf = new HamaConfiguration(new Configuration());
-      conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
-      GraphJob connectedComponentsJob = new GraphJob(conf,
-          MindistSearchVertex.class);
-      connectedComponentsJob.setJobName("Mindist Search");
-
-      connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
-      connectedComponentsJob.setInputPath(new Path(INPUT));
-      connectedComponentsJob.setOutputPath(new Path(OUTPUT));
-      // set the min text combiner here
-      connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
-
-      // set the defaults
-      connectedComponentsJob.setMaxIteration(30);
-      connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
-      connectedComponentsJob.setPartitioner(HashPartitioner.class);
-      connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
-      connectedComponentsJob.setOutputKeyClass(Text.class);
-      connectedComponentsJob.setOutputValueClass(Text.class);
-
-      connectedComponentsJob.setVertexIDClass(Text.class);
-      connectedComponentsJob.setVertexValueClass(Text.class);
-      connectedComponentsJob.setEdgeValueClass(NullWritable.class);
-
-      if (connectedComponentsJob.waitForCompletion(true)) {
-        verifyResult();
-      } else {
-        fail("Job not completed correctly!");
+      bw = new BufferedWriter(new FileWriter(INPUT));
+      for (String s : input) {
+        bw.write(s + "\n");
       }
+    } catch (IOException e) {
+      e.printStackTrace();
     } finally {
-      deleteTempDirs();
-    }
-  }
-
-  private static void generateTestTextData() throws IOException {
-    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
-        .entrySet()) {
-      writer.write(e.getKey() + "\t");
-      for (int i = 0; i < e.getValue().get().length; i++) {
-        @SuppressWarnings("unchecked")
-        VertexWritable<Text, IntWritable> writable = (VertexWritable<Text, IntWritable>) e
-            .getValue().get()[i];
-        writer.write(writable.getVertexId() + "\t");
+      if (bw != null) {
+        try {
+          bw.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
       }
-      writer.write("\n");
     }
-    writer.close();
   }
 
   private void deleteTempDirs() {

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sat May 26 16:11:11 2012
@@ -20,9 +20,6 @@ package org.apache.hama.examples;
 import java.io.BufferedWriter;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -31,21 +28,19 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.examples.PageRank.PageRankVertex;
 import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
 
-@SuppressWarnings("unchecked")
 public class PageRankTest extends TestCase {
   /**
    * The graph looks like this (adjacency list, [] contains outlinks):<br/>
@@ -56,37 +51,16 @@ public class PageRankTest extends TestCa
    * twitter.com [google.com, facebook.com]<br/>
    * nasa.gov [yahoo.com, stackoverflow.com]<br/>
    * youtube.com [google.com, yahoo.com]<br/>
+   * Note that google is removed in this part mainly to test the repair
+   * functionality.
    */
-  private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
-  static {
-    Configuration conf = new HamaConfiguration();
-    VertexWritable.CONFIGURATION = conf;
-    // our first entry is null, because our indices in hama 3.0 pre calculated
-    // example starts at 1.
-    // FIXME This is really ugly.
-    String[] pages = new String[] { null, "twitter.com", "google.com",
-        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
-        "youtube.com" };
-    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
-        "5;4;6", "6;4", "7;2;4" };
-
-    for (int i = 0; i < lineArray.length; i++) {
-
-      String[] adjacencyStringArray = lineArray[i].split(";");
-      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
-      String name = pages[vertexId];
-      VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
-      for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
-            new DoubleWritable(0.0d), new Text(
-                pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
-            DoubleWritable.class);
-      }
-      VertexArrayWritable wr = new VertexArrayWritable();
-      wr.set(arr);
-      tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
-    }
-  }
+  String[] input = new String[] { "stackoverflow.com\tyahoo.com",
+      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
+      "yahoo.com\tnasa.gov\tstackoverflow.com]",
+      "twitter.com\tgoogle.com\tfacebook.com]",
+      "nasa.gov\tyahoo.com\tstackoverflow.com]",
+      "youtube.com\tgoogle.com\tyahoo.com]" };
+
   private static String INPUT = "/tmp/pagerank-tmp.seq";
   private static String TEXT_INPUT = "/tmp/pagerank.txt";
   private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
@@ -117,26 +91,8 @@ public class PageRankTest extends TestCa
     assertTrue(sum > 0.99d && sum <= 1.1d);
   }
 
-  private void generateSeqTestData(
-      Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
-      throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
-        INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
-        .entrySet()) {
-      writer.append(e.getKey(), e.getValue());
-    }
-    writer.close();
-  }
-
   public void testRepairFunctionality() throws Exception {
-    // make a copy to be safe with parallel test executions
-    final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>(
-        tmp);
-    // removing google should resulting in creating it and getting the same
-    // result as usual
-    map.remove(new VertexWritable<Text, DoubleWritable>("google.com"));
-    generateSeqTestData(map);
+    generateTestData();
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
       conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
@@ -146,7 +102,7 @@ public class PageRankTest extends TestCa
       pageJob.setVertexClass(PageRankVertex.class);
       pageJob.setInputPath(new Path(INPUT));
       pageJob.setOutputPath(new Path(OUTPUT));
-
+      pageJob.setNumBspTask(2);
       // set the defaults
       pageJob.setMaxIteration(30);
       pageJob.set("hama.pagerank.alpha", "0.85");
@@ -155,13 +111,14 @@ public class PageRankTest extends TestCa
       pageJob.set("hama.graph.self.ref", "true");
 
       pageJob.setAggregatorClass(AverageAggregator.class);
-
-      pageJob.setInputFormat(SequenceFileInputFormat.class);
+      pageJob.setInputKeyClass(LongWritable.class);
+      pageJob.setInputValueClass(Text.class);
+      pageJob.setInputFormat(TextInputFormat.class);
       pageJob.setPartitioner(HashPartitioner.class);
       pageJob.setOutputFormat(SequenceFileOutputFormat.class);
       pageJob.setOutputKeyClass(Text.class);
       pageJob.setOutputValueClass(DoubleWritable.class);
-
+      pageJob.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
       pageJob.setVertexIDClass(Text.class);
       pageJob.setVertexValueClass(DoubleWritable.class);
       pageJob.setEdgeValueClass(NullWritable.class);
@@ -175,20 +132,24 @@ public class PageRankTest extends TestCa
     }
   }
 
-  @SuppressWarnings("unused")
-  private static void generateTestTextData() throws IOException {
-    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Map.Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
-        .entrySet()) {
-      writer.write(e.getKey() + "\t");
-      for (int i = 0; i < e.getValue().get().length; i++) {
-        VertexWritable<Text, DoubleWritable> writable = (VertexWritable<Text, DoubleWritable>) e
-            .getValue().get()[i];
-        writer.write(writable.getVertexId() + "\t");
+  private void generateTestData() {
+    BufferedWriter bw = null;
+    try {
+      bw = new BufferedWriter(new FileWriter(INPUT));
+      for (String s : input) {
+        bw.write(s + "\n");
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (bw != null) {
+        try {
+          bw.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
       }
-      writer.write("\n");
     }
-    writer.close();
   }
 
   private void deleteTempDirs() {

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Sat May 26 16:11:11 2012
@@ -33,101 +33,19 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
 
 /**
  * Testcase for {@link ShortestPaths}
  */
-
-@SuppressWarnings("unchecked")
 public class SSSPTest extends TestCase {
-
-  private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> testData = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
-
-  static {
-    Configuration conf = new Configuration();
-    VertexWritable.CONFIGURATION = conf;
-    String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
-        "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
-        "Muenchen" };
-
-    for (String city : cities) {
-      if (city.equals("Frankfurt")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable<Text, IntWritable>(85, "Mannheim");
-        textArr[1] = new VertexWritable<Text, IntWritable>(173, "Kassel");
-        textArr[2] = new VertexWritable<Text, IntWritable>(217, "Wuerzburg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Stuttgart")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
-        textArr[0] = new VertexWritable<Text, IntWritable>(183, "Nuernberg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Kassel")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable<Text, IntWritable>(502, "Muenchen");
-        textArr[1] = new VertexWritable<Text, IntWritable>(173, "Frankfurt");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Erfurt")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
-        textArr[0] = new VertexWritable<Text, IntWritable>(186, "Wuerzburg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Wuerzburg")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable<Text, IntWritable>(217, "Frankfurt");
-        textArr[1] = new VertexWritable<Text, IntWritable>(186, "Erfurt");
-        textArr[2] = new VertexWritable<Text, IntWritable>(103, "Nuernberg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Mannheim")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable<Text, IntWritable>(80, "Karlsruhe");
-        textArr[1] = new VertexWritable<Text, IntWritable>(85, "Frankfurt");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Karlsruhe")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable<Text, IntWritable>(250, "Augsburg");
-        textArr[1] = new VertexWritable<Text, IntWritable>(80, "Mannheim");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Augsburg")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable<Text, IntWritable>(250, "Karlsruhe");
-        textArr[1] = new VertexWritable<Text, IntWritable>(84, "Muenchen");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Nuernberg")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable<Text, IntWritable>(183, "Stuttgart");
-        textArr[1] = new VertexWritable<Text, IntWritable>(167, "Muenchen");
-        textArr[2] = new VertexWritable<Text, IntWritable>(103, "Wuerzburg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      } else if (city.equals("Muenchen")) {
-        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable<Text, IntWritable>(167, "Nuernberg");
-        textArr[1] = new VertexWritable<Text, IntWritable>(502, "Kassel");
-        textArr[2] = new VertexWritable<Text, IntWritable>(84, "Augsburg");
-        VertexArrayWritable arr = new VertexArrayWritable();
-        arr.set(textArr);
-        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
-      }
-    }
-  }
+  String[] input = new String[] { "1:85\t2:217\t4:173", "0:85\t5:80",
+      "0:217\t6:186\t7:103", 
+      "7:183",
+      "0:173\t9:502", 
+      "1:80\t8:250",
+      "2:186",
+      "3:183\t9:167\t2:103",
+      "5:250\t9:84", "4:502\t7:167\t8:84" };
 
   private static String INPUT = "/tmp/sssp-tmp.seq";
   private static String TEXT_INPUT = "/tmp/sssp.txt";
@@ -145,10 +63,9 @@ public class SSSPTest extends TestCase {
   public void testShortestPaths() throws IOException, InterruptedException,
       ClassNotFoundException, InstantiationException, IllegalAccessException {
 
-    generateTestSequenceFileData();
+    generateTestData();
     try {
-      SSSP.main(new String[] { "Frankfurt", INPUT, OUTPUT });
-
+      SSSP.main(new String[] { "0", INPUT, OUTPUT, "2" });
       verifyResult();
     } finally {
       deleteTempDirs();
@@ -157,16 +74,16 @@ public class SSSPTest extends TestCase {
 
   private void verifyResult() throws IOException {
     Map<String, Integer> rs = new HashMap<String, Integer>();
-    rs.put("Erfurt", 403);
-    rs.put("Mannheim", 85);
-    rs.put("Stuttgart", 503);
-    rs.put("Kassel", 173);
-    rs.put("Nuernberg", 320);
-    rs.put("Augsburg", 415);
-    rs.put("Frankfurt", 0);
-    rs.put("Muenchen", 487);
-    rs.put("Wuerzburg", 217);
-    rs.put("Karlsruhe", 165);
+    rs.put("6", 403);
+    rs.put("1", 85);
+    rs.put("3", 503);
+    rs.put("4", 173);
+    rs.put("7", 320);
+    rs.put("8", 415);
+    rs.put("0", 0);
+    rs.put("9", 487);
+    rs.put("2", 217);
+    rs.put("5", 165);
 
     FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
     for (FileStatus fts : globStatus) {
@@ -180,33 +97,25 @@ public class SSSPTest extends TestCase {
     }
   }
 
-  private void generateTestSequenceFileData() throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
-        INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
-        .entrySet()) {
-      writer.append(e.getKey(), e.getValue());
-    }
-    writer.close();
-  }
-
-  @SuppressWarnings("unused")
-  private static void generateTestTextData() throws IOException {
-    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
-        .entrySet()) {
-      writer.write(e.getKey().getVertexId() + "\t");
-      for (int i = 0; i < e.getValue().get().length; i++) {
-        writer
-            .write(((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
-                .getVertexId()
-                + ":"
-                + ((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
-                    .getVertexValue() + "\t");
+  private void generateTestData() {
+    BufferedWriter bw = null;
+    try {
+      bw = new BufferedWriter(new FileWriter(INPUT));
+      int index = 0;
+      for (String s : input) {
+        bw.write((index++) + "\t" + s + "\n");
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (bw != null) {
+        try {
+          bw.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
       }
-      writer.write("\n");
     }
-    writer.close();
   }
 
   private void deleteTempDirs() {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Sat May 26 16:11:11 2012
@@ -26,13 +26,22 @@ import org.apache.hadoop.io.Writable;
 public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
 
   private final VERTEX_ID destinationVertexID;
-  private final String destinationPeerName;
   private final EDGE_VALUE_TYPE cost;
+  String destinationPeerName;
 
-  public Edge(VERTEX_ID sourceVertexID, String destVertexID,
+  public Edge(VERTEX_ID sourceVertexID, EDGE_VALUE_TYPE cost) {
+    this.destinationVertexID = sourceVertexID;
+    if (cost == null || cost instanceof NullWritable) {
+      this.cost = null;
+    } else {
+      this.cost = cost;
+    }
+  }
+
+  public Edge(VERTEX_ID sourceVertexID, String destinationPeer,
       EDGE_VALUE_TYPE cost) {
     this.destinationVertexID = sourceVertexID;
-    this.destinationPeerName = destVertexID;
+    destinationPeerName = destinationPeer;
     if (cost instanceof NullWritable) {
       this.cost = null;
     } else {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sat May 26 16:11:11 2012
@@ -25,8 +25,11 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 
+import com.google.common.base.Preconditions;
+
 public class GraphJob extends BSPJob {
 
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
@@ -37,6 +40,7 @@ public class GraphJob extends BSPJob {
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
   public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
+  public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class";
 
   /**
    * Creates a new Graph Job with the given configuration and an exampleClass.
@@ -48,12 +52,12 @@ public class GraphJob extends BSPJob {
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
-    VertexWritable.CONFIGURATION = conf;
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
     this.setVertexIDClass(Text.class);
     this.setVertexValueClass(IntWritable.class);
     this.setEdgeValueClass(IntWritable.class);
+    this.setPartitioner(HashPartitioner.class);
   }
 
   /**
@@ -109,6 +113,15 @@ public class GraphJob extends BSPJob {
     conf.set(AGGREGATOR_CLASS_ATTR, classNames);
   }
 
+  /**
+   * Sets the input reader for parsing the input to vertices.
+   */
+  public void setVertexInputReaderClass(
+      Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class);
+  }
+
   @SuppressWarnings("unchecked")
   public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
     return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
@@ -136,4 +149,25 @@ public class GraphJob extends BSPJob {
     conf.setInt("hama.graph.max.iteration", maxIteration);
   }
 
+  @Override
+  public void submit() throws IOException, InterruptedException {
+    Preconditions.checkArgument(this.getConf().get(VERTEX_CLASS_ATTR) != null,
+        "Please provide a vertex class!");
+    Preconditions.checkArgument(
+        this.getConf().get(VERTEX_ID_CLASS_ATTR) != null,
+        "Please provide an vertex ID class!");
+    Preconditions
+        .checkArgument(
+            this.getConf().get(VERTEX_VALUE_CLASS_ATTR) != null,
+            "Please provide an vertex value class, if you don't need one, use NullWritable!");
+    Preconditions
+        .checkArgument(
+            this.getConf().get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
+            "Please provide an edge value class, if you don't need one, use NullWritable!");
+    Preconditions.checkArgument(
+        this.getConf().get(VERTEX_GRAPH_INPUT_READER) != null,
+        "Please provide a vertex input reader!");
+    super.submit();
+  }
+
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Sat May 26 16:11:11 2012
@@ -94,7 +94,7 @@ public final class GraphJobMessage imple
       } else {
         out.writeBoolean(false);
       }
-      List<?> outEdges = vertex.getOutEdges();
+      List<?> outEdges = vertex.getEdges();
       out.writeInt(outEdges.size());
       for (Object e : outEdges) {
         Edge<?, ?> edge = (Edge<?, ?>) e;
@@ -137,7 +137,7 @@ public final class GraphJobMessage imple
         vertex.setValue(vertexValue);
       }
       int size = in.readInt();
-      vertex.edges = new ArrayList<Edge<Writable, Writable>>(size);
+      vertex.setEdges(new ArrayList<Edge<Writable, Writable>>(size));
       for (int i = 0; i < size; i++) {
         String destination = in.readUTF();
         Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
@@ -148,8 +148,8 @@ public final class GraphJobMessage imple
           edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null);
           edgeValue.readFields(in);
         }
-        vertex.edges.add(new Edge<Writable, Writable>(edgeVertexID,
-            destination, edgeValue));
+        vertex.getEdges().add(
+            new Edge<Writable, Writable>(edgeVertexID, destination, edgeValue));
       }
       this.vertex = vertex;
     } else {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sat May 26 16:11:11 2012
@@ -52,8 +52,7 @@ import org.apache.hama.util.KeyValuePair
  * @param <VERTEX_VALUE> the value type of an edge.
  */
 public final class GraphJobRunner<VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable>
-    extends
-    BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> {
+    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
 
   static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
@@ -102,10 +101,9 @@ public final class GraphJobRunner<VERTEX
   @Override
   @SuppressWarnings("unchecked")
   public final void setup(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
     this.conf = peer.getConfiguration();
-    VertexWritable.CONFIGURATION = conf;
     // Choose one as a master to collect global updates
     this.masterTask = peer.getPeerName(0);
 
@@ -126,7 +124,7 @@ public final class GraphJobRunner<VERTEX
 
     boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
     boolean runtimePartitioning = conf.getBoolean(
-        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, false);
+        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
     Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
         .newInstance(
             conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
@@ -168,7 +166,11 @@ public final class GraphJobRunner<VERTEX
       }
     }
 
-    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner);
+    VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> reader = (VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
+        .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
+            VertexInputReader.class), conf);
+
+    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader);
     numberVertices = vertices.size() * peer.getNumPeers();
     // TODO refactor this to a single step
     for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
@@ -196,7 +198,7 @@ public final class GraphJobRunner<VERTEX
 
   @Override
   public final void bsp(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
@@ -296,7 +298,7 @@ public final class GraphJobRunner<VERTEX
   }
 
   private void runAggregators(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
       int messagesSize) throws IOException {
     // send msgCounts to the master task
     MapWritable updatedCnt = new MapWritable();
@@ -326,7 +328,7 @@ public final class GraphJobRunner<VERTEX
 
   @SuppressWarnings("unchecked")
   private Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> parseMessages(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     GraphJobMessage msg = null;
     final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
@@ -375,60 +377,66 @@ public final class GraphJobRunner<VERTEX
 
   @SuppressWarnings("unchecked")
   private void loadVertices(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
-      boolean repairNeeded, boolean runtimePartitioning,
-      Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner) throws IOException,
-      SyncException, InterruptedException {
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      boolean repairNeeded,
+      boolean runtimePartitioning,
+      Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner,
+      VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> reader)
+      throws IOException, SyncException, InterruptedException {
+
     LOG.debug("vertex class: " + vertexClass);
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
-    KeyValuePair<? extends VertexWritable<VERTEX_ID, VERTEX_VALUE>, ? extends VertexArrayWritable> next = null;
-    while ((next = peer.readNext()) != null) {
-      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
-          vertexClass, conf);
-      vertex.setVertexID(next.getKey().getVertexId());
-      vertex.peer = peer;
-      vertex.runner = this;
-
-      VertexWritable<VERTEX_ID, VERTEX_VALUE>[] arr = (VertexWritable[]) next
-          .getValue().toArray();
+    Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+        vertexClass, conf);
+    vertex.peer = peer;
+    vertex.runner = this;
+    while (true) {
+      KeyValuePair<Writable, Writable> next = peer.readNext();
+      if (next == null) {
+        break;
+      }
+      boolean vertexFinished = reader.parseVertex(next.getKey(),
+          next.getValue(), vertex);
+      if (!vertexFinished) {
+        continue;
+      }
+      if (vertex.getEdges() == null) {
+        vertex.setEdges(new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>(0));
+      }
       if (selfReference) {
-        VertexWritable<VERTEX_ID, VERTEX_VALUE>[] tmp = new VertexWritable[arr.length + 1];
-        System.arraycopy(arr, 0, tmp, 0, arr.length);
-        tmp[arr.length] = new VertexWritable<VERTEX_ID, VERTEX_VALUE>(
-            vertex.getValue(), vertex.getVertexID(), vertexIdClass,
-            vertexValueClass);
-        arr = tmp;
-      }
-      List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> edges = new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>();
-      for (VertexWritable<VERTEX_ID, VERTEX_VALUE> e : arr) {
-        int partition = partitioner.getPartition(e.getVertexId(),
-            e.getVertexValue(), peer.getNumPeers());
-        String target = peer.getPeerName(partition);
-        edges.add(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(e.getVertexId(), target,
-            (EDGE_VALUE_TYPE) e.getVertexValue()));
+        vertex.addEdge(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
+            .getVertexID(), peer.getPeerName(), null));
       }
-
-      vertex.edges = edges;
       if (runtimePartitioning) {
         int partition = partitioner.getPartition(vertex.getVertexID(),
             vertex.getValue(), peer.getNumPeers());
+        // set the destination name for the edge now
+        for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> edge : vertex.getEdges()) {
+          int edgePartition = partitioner.getPartition(
+              edge.getDestinationVertexID(), (VERTEX_VALUE) edge.getValue(),
+              peer.getNumPeers());
+          edge.destinationPeerName = peer.getPeerName(edgePartition);
+        }
         peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
       } else {
         vertex.setup(conf);
-        vertices.put(next.getKey().getVertexId(), vertex);
+        vertices.put(vertex.getVertexID(), vertex);
       }
+      vertex = newVertexInstance(vertexClass, conf);
+      vertex.peer = peer;
+      vertex.runner = this;
     }
 
     if (runtimePartitioning) {
       peer.sync();
       GraphJobMessage msg = null;
       while ((msg = peer.getCurrentMessage()) != null) {
-        Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
+        Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> messagedVertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
             .getVertex();
-        vertex.peer = peer;
-        vertex.runner = this;
-        vertex.setup(conf);
-        vertices.put(vertex.getVertexID(), vertex);
+        messagedVertex.peer = peer;
+        messagedVertex.runner = this;
+        messagedVertex.setup(conf);
+        vertices.put(messagedVertex.getVertexID(), messagedVertex);
       }
     }
 
@@ -444,7 +452,7 @@ public final class GraphJobRunner<VERTEX
       final Collection<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> entries = vertices
           .values();
       for (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> entry : entries) {
-        List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getOutEdges();
+        List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getEdges();
         for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> e : outEdges) {
           peer.send(e.getDestinationPeerName(),
               new GraphJobMessage(e.getDestinationVertexID()));
@@ -455,23 +463,24 @@ public final class GraphJobRunner<VERTEX
       while ((msg = peer.getCurrentMessage()) != null) {
         VERTEX_ID vertexName = (VERTEX_ID) msg.getVertexId();
         if (!vertices.containsKey(vertexName)) {
-          Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+          Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertex = newVertexInstance(
               vertexClass, conf);
-          vertex.peer = peer;
-          vertex.setVertexID(vertexName);
-          vertex.runner = this;
+          newVertex.peer = peer;
+          newVertex.setVertexID(vertexName);
+          newVertex.runner = this;
           if (selfReference) {
-            int partition = partitioner.getPartition(vertex.getVertexID(),
-                vertex.getValue(), peer.getNumPeers());
+            int partition = partitioner.getPartition(newVertex.getVertexID(),
+                newVertex.getValue(), peer.getNumPeers());
             String target = peer.getPeerName(partition);
-            vertex.edges = Collections
-                .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
-                    .getVertexID(), target, null));
+            newVertex.setEdges(Collections
+                .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(newVertex
+                    .getVertexID(), target, null)));
           } else {
-            vertex.edges = Collections.emptyList();
+            newVertex.setEdges(new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>(
+                0));
           }
-          vertex.setup(conf);
-          vertices.put(vertexName, vertex);
+          newVertex.setup(conf);
+          vertices.put(vertexName, newVertex);
         }
       }
     }
@@ -495,7 +504,7 @@ public final class GraphJobRunner<VERTEX
    */
   @Override
   public final void cleanup(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
         .entrySet()) {
@@ -517,7 +526,7 @@ public final class GraphJobRunner<VERTEX
   }
 
   private boolean isMasterTask(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) {
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     return peer.getPeerName().equals(masterTask);
   }
 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sat May 26 16:11:11 2012
@@ -18,6 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,8 +32,8 @@ public abstract class Vertex<ID_TYPE ext
   private ID_TYPE vertexID;
   private MSG_TYPE value;
   protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
-  protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer;
-  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
+  protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+  private List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
 
   public Configuration getConf() {
     return peer.getConfiguration();
@@ -56,7 +57,7 @@ public abstract class Vertex<ID_TYPE ext
 
   @Override
   public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException {
-    final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getOutEdges();
+    final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getEdges();
     for (Edge<ID_TYPE, EDGE_VALUE_TYPE> e : outEdges) {
       sendMessage(e, msg);
     }
@@ -67,8 +68,19 @@ public abstract class Vertex<ID_TYPE ext
     return runner.getNumberIterations();
   }
 
+  public void setEdges(List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> list) {
+    this.edges = list;
+  }
+
+  public void addEdge(Edge<ID_TYPE, EDGE_VALUE_TYPE> edge) {
+    if (edges == null) {
+      this.edges = new ArrayList<Edge<ID_TYPE, EDGE_VALUE_TYPE>>();
+    }
+    this.edges.add(edge);
+  }
+
   @Override
-  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges() {
+  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges() {
     return edges;
   }
 

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1342922&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Sat May 26 16:11:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A reader to read Hama's input files and parses a vertex out of it.
+ */
+public abstract class VertexInputReader<KEY_IN extends Writable, VALUE_IN extends Writable, VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable> {
+
+  /**
+   * Parses a given key and value into the given vertex. If returned true, the
+   * given vertex is considered finished and a new instance will be given in the
+   * next call.
+   */
+  public abstract boolean parseVertex(KEY_IN key, VALUE_IN value,
+      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE> vertex);
+
+}

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Sat May 26 16:11:11 2012
@@ -50,7 +50,7 @@ public interface VertexInterface<ID_TYPE
   public void compute(Iterator<MSG_TYPE> messages) throws IOException;
 
   /** @return a list of outgoing edges of this vertex in the input graph. */
-  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges();
+  public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges();
 
   /** Sends a message to another vertex. */
   public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)

Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sat May 26 16:11:11 2012
@@ -17,60 +17,34 @@
  */
 package org.apache.hama.graph;
 
+import java.io.BufferedWriter;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TestBSPMasterGroomServer;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.example.PageRank;
 
 public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
 
-  private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
-  static {
-    Configuration conf = new HamaConfiguration();
-    VertexWritable.CONFIGURATION = conf;
-    // our first entry is null, because our indices in hama 3.0 pre calculated
-    // example starts at 1.
-    // FIXME This is really ugly.
-    String[] pages = new String[] { null, "twitter.com", "google.com",
-        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
-        "youtube.com" };
-    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
-        "5;4;6", "6;4", "7;2;4" };
-
-    for (int i = 0; i < lineArray.length; i++) {
-
-      String[] adjacencyStringArray = lineArray[i].split(";");
-      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
-      String name = pages[vertexId];
-      @SuppressWarnings("unchecked")
-      VertexWritable<Text, NullWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
-      for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable<Text, NullWritable>(NullWritable.get(),
-            new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
-            Text.class, NullWritable.class);
-      }
-      VertexArrayWritable wr = new VertexArrayWritable();
-      wr.set(arr);
-      tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
-    }
-  }
+  String[] input = new String[] { "stackoverflow.com\tyahoo.com",
+      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
+      "yahoo.com\tnasa.gov\tstackoverflow.com]",
+      "twitter.com\tgoogle.com\tfacebook.com]",
+      "nasa.gov\tyahoo.com\tstackoverflow.com]",
+      "youtube.com\tgoogle.com\tyahoo.com]" };
 
   private static String INPUT = "/tmp/pagerank-real-tmp.seq";
   private static String OUTPUT = "/tmp/pagerank-real-out";
@@ -79,7 +53,7 @@ public class TestSubmitGraphJob extends 
   @Override
   public void testSubmitJob() throws Exception {
 
-    generateSeqTestData(tmp);
+    generateTestData();
 
     GraphJob bsp = new GraphJob(configuration, PageRank.class);
     bsp.setInputPath(new Path(INPUT));
@@ -98,13 +72,17 @@ public class TestSubmitGraphJob extends 
     // we need to include a vertex in its adjacency list,
     // otherwise the pagerank result has a constant loss
     bsp.set("hama.graph.self.ref", "true");
+    bsp.set("hama.graph.repair", "true");
     bsp.setAggregatorClass(AverageAggregator.class, SumAggregator.class);
 
     bsp.setVertexIDClass(Text.class);
     bsp.setVertexValueClass(DoubleWritable.class);
     bsp.setEdgeValueClass(NullWritable.class);
 
-    bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
+    bsp.setInputFormat(TextInputFormat.class);
+    bsp.setInputKeyClass(LongWritable.class);
+    bsp.setInputValueClass(Text.class);
     bsp.setPartitioner(HashPartitioner.class);
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(Text.class);
@@ -137,15 +115,23 @@ public class TestSubmitGraphJob extends 
     assertTrue(sum > 0.99d && sum <= 1.1d);
   }
 
-  private void generateSeqTestData(
-      Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
-      throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
-        INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
-        .entrySet()) {
-      writer.append(e.getKey(), e.getValue());
+  private void generateTestData() {
+    BufferedWriter bw = null;
+    try {
+      bw = new BufferedWriter(new FileWriter(INPUT));
+      for (String s : input) {
+        bw.write(s + "\n");
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (bw != null) {
+        try {
+          bw.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
     }
-    writer.close();
   }
 }

Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sat May 26 16:11:11 2012
@@ -22,9 +22,12 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
 
 public class PageRank {
   public static class PageRankVertex extends
@@ -45,7 +48,7 @@ public class PageRank {
       if (val != null) {
         MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
       }
-      numEdges = this.getOutEdges().size();
+      numEdges = this.getEdges().size();
     }
 
     @Override
@@ -65,8 +68,11 @@ public class PageRank {
         double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
         this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
         if (this.getSuperstepCount() > 1) {
-          if(this.getLastAggregatedValue(1).get() < 0.99 || this.getLastAggregatedValue(1).get() > 1.0){
-            throw new RuntimeException("Sum aggregator hasn't summed correctly! " + this.getLastAggregatedValue(1).get());
+          if (this.getLastAggregatedValue(1).get() < 0.99
+              || this.getLastAggregatedValue(1).get() > 1.1) {
+            throw new RuntimeException(
+                "Sum aggregator hasn't summed correctly! "
+                    + this.getLastAggregatedValue(1).get());
           }
         }
       }
@@ -82,4 +88,33 @@ public class PageRank {
           / numEdges));
     }
   }
+
+  public static class PagerankTextReader extends
+      VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+     * E.G:<br/>
+     * 1\t2\t3\t4<br/>
+     * 2\t3\t1<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, DoubleWritable, NullWritable> vertex) {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
+    }
+
+  }
+
 }