You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/01/13 21:45:36 UTC

svn commit: r1432733 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/jav...

Author: surajsmenon
Date: Sun Jan 13 20:45:35 2013
New Revision: 1432733

URL: http://svn.apache.org/viewvc?rev=1432733&view=rev
Log:
[HAMA-700] and fixed the unit tests.

Added:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun Jan 13 20:45:35 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)
    HAMA-524: Add SpMV example (Mikalai Parafeniuk via edwardyoon)
    HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sun Jan 13 20:45:35 2013
@@ -22,7 +22,6 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sun Jan 13 20:45:35 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
@@ -288,7 +289,7 @@ public class BSPJobClient extends Config
    * @throws IOException
    */
   public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
-  IOException {
+      IOException {
     return submitJobInternal(job, jobSubmitClient.getNewJobId());
   }
 
@@ -368,12 +369,14 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
-
   protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
-    
-    if(job.get("bsp.partitioning.runner.job") != null){return job;}//Early exit for the partitioner job.
-    
-    InputSplit[] splits = job.getInputFormat().getSplits(job,
+
+    if (job.get("bsp.partitioning.runner.job") != null) {
+      return job;
+    }// Early exit for the partitioner job.
+
+    InputSplit[] splits = job.getInputFormat().getSplits(
+        job,
         (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
             : maxTasks);
 
@@ -386,6 +389,17 @@ public class BSPJobClient extends Config
     if (inputPath != null) {
       int numSplits = splits.length;
       int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" numTasks = "
+            + numTasks
+            + " numSplits = "
+            + numSplits
+            + " enable = "
+            + (job.getConfiguration().getBoolean(
+                Constants.ENABLE_RUNTIME_PARTITIONING, false)
+                + " class = " + job.getConfiguration().get(
+                Constants.RUNTIME_PARTITIONING_CLASS)));
+      }
 
       if ((numTasks > 0 && numTasks != numSplits)
           || (job.getConfiguration().getBoolean(
@@ -398,14 +412,18 @@ public class BSPJobClient extends Config
           fs.delete(partitionDir, true);
         }
 
-        HamaConfiguration conf = new HamaConfiguration();
+        HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
+
         conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
             Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
         if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
           conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
               .get(Constants.RUNTIME_PARTITIONING_DIR));
         }
-        conf.set(Constants.RUNTIME_PARTITIONING_CLASS, job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+        if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null) {
+          conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+              job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+        }
         BSPJob partitioningJob = new BSPJob(conf);
         partitioningJob.setInputPath(new Path(job.getConfiguration().get(
             Constants.JOB_INPUT_DIR)));
@@ -413,8 +431,12 @@ public class BSPJobClient extends Config
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
         partitioningJob.setInputValueClass(job.getInputValueClass());
         partitioningJob.setOutputFormat(NullOutputFormat.class);
+        partitioningJob.setOutputKeyClass(NullWritable.class);
+        partitioningJob.setOutputValueClass(NullWritable.class);
         partitioningJob.setBspClass(PartitioningRunner.class);
         partitioningJob.set("bsp.partitioning.runner.job", "true");
+        partitioningJob.getConfiguration().setBoolean(
+            Constants.ENABLE_RUNTIME_PARTITIONING, false);
 
         boolean isPartitioned = false;
         try {
@@ -431,6 +453,7 @@ public class BSPJobClient extends Config
           } else {
             job.setInputPath(new Path(inputDir + "/partitions"));
           }
+          job.setInputFormat(SequenceFileInputFormat.class);
         } else {
           LOG.error("Error partitioning the input path.");
           throw new IOException("Runtime partition failed for the job.");
@@ -440,7 +463,6 @@ public class BSPJobClient extends Config
     return job;
   }
 
-
   protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
     int maxTasks;
     ClusterStatus clusterStatus = getClusterStatus(true);
@@ -628,9 +650,9 @@ public class BSPJobClient extends Config
     if (job.isSuccessful()) {
       LOG.info("The total number of supersteps: " + info.getSuperstepCount());
       info.getStatus()
-      .getCounter()
-      .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
-          info.getSuperstepCount());
+          .getCounter()
+          .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
+              info.getSuperstepCount());
       info.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
@@ -692,7 +714,7 @@ public class BSPJobClient extends Config
   }
 
   public static void runJob(BSPJob job) throws FileNotFoundException,
-  IOException {
+      IOException {
     BSPJobClient jc = new BSPJobClient(job.getConfiguration());
 
     if (job.getNumBspTask() == 0

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun Jan 13 20:45:35 2013
@@ -36,29 +36,87 @@ import org.apache.hama.util.KeyValuePair
 
 public class PartitioningRunner extends
     BSP<Writable, Writable, Writable, Writable, NullWritable> {
+
   private Configuration conf;
   private int desiredNum;
   private FileSystem fs = null;
   private Path partitionDir;
+  private RecordConverter converter;
   private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer, Map<Writable, Writable>>();
 
   @Override
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
       throws IOException, SyncException, InterruptedException {
+
     this.conf = peer.getConfiguration();
-    this.desiredNum = conf.getInt("desired.num.of.tasks", 1);
+    this.desiredNum = conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
+
     this.fs = FileSystem.get(conf);
 
-    Path inputDir = new Path(conf.get("bsp.input.dir"));
+    Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR));
     if (fs.isFile(inputDir)) {
       inputDir = inputDir.getParent();
     }
 
-    if(conf.get("bsp.partitioning.dir") != null) {
-      this.partitionDir = new Path(conf.get("bsp.partitioning.dir"));
-    } else {
+    converter = ReflectionUtils.newInstance(conf.getClass(
+        Constants.RUNTIME_PARTITION_RECORDCONVERTER,
+        DefaultRecordConverter.class, RecordConverter.class), conf);
+
+    if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
       this.partitionDir = new Path(inputDir + "/partitions");
+    } else {
+      this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
+    }
+
+  }
+
+  /**
+   * This record converter could be used to convert the records from the input
+   * format type to the sequential record types the BSP Job uses for
+   * computation.
+   * 
+   */
+  public static interface RecordConverter {
+
+    /**
+     * Should return the Key-Value pair constructed from the input format.
+     * 
+     * @param inputRecord The input key-value pair.
+     * @param conf Configuration of the job.
+     * @return the Key-Value pair instance of the expected sequential format.
+     *         Should return null if the conversion was not successful.
+     */
+    public KeyValuePair<Writable, Writable> convertRecord(
+        KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
+
+    public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks);
+  }
+
+  /**
+   * The default converter does no conversion.
+   */
+  public static class DefaultRecordConverter implements RecordConverter {
+
+    @Override
+    public KeyValuePair<Writable, Writable> convertRecord(
+        KeyValuePair<Writable, Writable> inputRecord, Configuration conf) {
+      return inputRecord;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks) {
+      return Math.abs(partitioner.getPartition(outputRecord.getKey(),
+          outputRecord.getValue(), numTasks));
     }
   }
 
@@ -69,22 +127,36 @@ public class PartitioningRunner extends
       throws IOException, SyncException, InterruptedException {
     Partitioner partitioner = getPartitioner();
     KeyValuePair<Writable, Writable> pair = null;
+    KeyValuePair<Writable, Writable> outputPair = null;
 
     Class keyClass = null;
     Class valueClass = null;
+    Class outputKeyClass = null;
+    Class outputValueClass = null;
     while ((pair = peer.readNext()) != null) {
       if (keyClass == null && valueClass == null) {
         keyClass = pair.getKey().getClass();
         valueClass = pair.getValue().getClass();
       }
 
-      int index = Math.abs(partitioner.getPartition(pair.getKey(),
-          pair.getValue(), desiredNum));
+      outputPair = converter.convertRecord(pair, conf);
+
+      if (outputPair == null) {
+        continue;
+      }
+
+      if (outputKeyClass == null && outputValueClass == null) {
+        outputKeyClass = outputPair.getKey().getClass();
+        outputValueClass = outputPair.getValue().getClass();
+      }
+
+      int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
+          desiredNum);
 
       if (!values.containsKey(index)) {
         values.put(index, new HashMap<Writable, Writable>());
       }
-      values.get(index).put(pair.getKey(), pair.getValue());
+      values.get(index).put(outputPair.getKey(), outputPair.getValue());
     }
 
     // The reason of use of Memory is to reduce file opens
@@ -92,7 +164,8 @@ public class PartitioningRunner extends
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
           + peer.getPeerIndex());
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-          destFile, keyClass, valueClass, CompressionType.NONE);
+          destFile, outputKeyClass, outputValueClass, CompressionType.NONE);
+
       for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
         writer.append(v.getKey(), v.getValue());
       }
@@ -102,28 +175,31 @@ public class PartitioningRunner extends
     peer.sync();
 
     // merge files into one.
-    // TODO if we use header info, we might able to merge files without full scan.
+    // TODO if we use header info, we might able to merge files without full
+    // scan.
     FileStatus[] status = fs.listStatus(partitionDir);
     for (int j = 0; j < status.length; j++) {
       int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
       int assignedID = idx / (desiredNum / peer.getNumPeers());
       if (assignedID == peer.getNumPeers())
         assignedID = assignedID - 1;
-      
-      // TODO set replica factor to 1. 
+
+      // TODO set replica factor to 1.
       // TODO and check whether we can write to specific DataNode.
       if (assignedID == peer.getPeerIndex()) {
         FileStatus[] files = fs.listStatus(status[j].getPath());
         SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            new Path(partitionDir + "/" + getPartitionName(j)), keyClass,
-            valueClass, CompressionType.NONE);
+            new Path(partitionDir + "/" + getPartitionName(j)), outputKeyClass,
+            outputValueClass, CompressionType.NONE);
 
         for (int i = 0; i < files.length; i++) {
           SequenceFile.Reader reader = new SequenceFile.Reader(fs,
               files[i].getPath(), conf);
 
-          Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
-          Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+          Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+              conf);
+          Writable value = (Writable) ReflectionUtils.newInstance(
+              outputValueClass, conf);
 
           while (reader.next(key, value)) {
             writer.append(key, value);
@@ -139,9 +215,9 @@ public class PartitioningRunner extends
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return ReflectionUtils.newInstance(conf
-        .getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
-            Partitioner.class), conf);
+    return ReflectionUtils.newInstance(conf.getClass(
+        Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
+        Partitioner.class), conf);
   }
 
   private static String getPartitionName(int i) {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -159,6 +161,41 @@ public final class BipartiteMatching {
       return !getValue().getFirst().equals(UNMATCHED);
     }
 
+    @Override
+    public void readState(DataInput in) throws IOException {
+      if (in.readBoolean()) {
+        reusableMessage = new TextPair();
+        reusableMessage.readFields(in);
+      }
+
+    }
+
+    @Override
+    public void writeState(DataOutput out) throws IOException {
+      if (reusableMessage == null) {
+        out.writeBoolean(false);
+      } else {
+        out.writeBoolean(true);
+        reusableMessage.write(out);
+      }
+
+    }
+
+    @Override
+    public Text createVertexIDObject() {
+      return new Text();
+    }
+
+    @Override
+    public NullWritable createEdgeCostObject() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public TextPair createVertexValue() {
+      return new TextPair();
+    }
+
   }
 
   /**
@@ -199,16 +236,9 @@ public final class BipartiteMatching {
     System.exit(-1);
   }
 
-  public static void main(String... args) throws IOException,
-      InterruptedException, ClassNotFoundException {
-
-    if (args.length < 2) {
-      printUsage();
-    }
-
-    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+  public static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException{
     GraphJob job = new GraphJob(conf, BipartiteMatching.class);
-
+    
     // set the defaults
     job.setMaxIteration(30);
     job.setNumBspTask(2);
@@ -230,14 +260,26 @@ public final class BipartiteMatching {
     job.setVertexValueClass(TextPair.class);
     job.setEdgeValueClass(NullWritable.class);
 
-    job.setInputKeyClass(LongWritable.class);
-    job.setInputValueClass(Text.class);
     job.setInputFormat(TextInputFormat.class);
     job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
     job.setPartitioner(HashPartitioner.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TextPair.class);
+    return job;
+  }
+  
+  
+  public static void main(String... args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    if (args.length < 2) {
+      printUsage();
+    }
+
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    
+    GraphJob job = createJob(args, conf);
 
     long startTime = System.currentTimeMillis();
     if (job.waitForCompletion(true)) {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -94,4 +96,27 @@ public class InlinkCount extends Vertex<
           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
+
+  @Override
+  public void readState(DataInput in) throws IOException {}
+
+  @Override
+  public void writeState(DataOutput out) throws IOException {}
+
+  @Override
+  public Text createVertexIDObject() {
+    return new Text();
+  }
+
+  @Override
+  public NullWritable createEdgeCostObject() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public IntWritable createVertexValue() {
+    return new IntWritable();
+  }
+
+  
 }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -79,6 +81,29 @@ public class MindistSearch {
         }
       }
     }
+
+    @Override
+    public void readState(DataInput in) throws IOException {}
+
+    @Override
+    public void writeState(DataOutput out) throws IOException {}
+
+    @Override
+    public Text createVertexIDObject() {
+      return new Text();
+    }
+
+    @Override
+    public NullWritable createEdgeCostObject() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public Text createVertexValue() {
+      return new Text();
+    }
+
+    
   }
 
   public static class MinTextCombiner extends Combiner<Text> {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sun Jan 13 20:45:35 2013
@@ -17,29 +17,61 @@
  */
 package org.apache.hama.examples;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileInputFormat;
-import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
 
 /**
  * Real pagerank with dangling node contribution.
  */
 public class PageRank {
 
+  public static class PagerankTextReader extends
+      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+    /**
+     * The text file essentially should look like: <br/>
+     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+     * E.G:<br/>
+     * 1\t2\t3\t4<br/>
+     * 2\t3\t1<br/>
+     * etc.
+     */
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
+    }
+
+  }
+
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
 
@@ -95,6 +127,30 @@ public class PageRank {
       sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
           / numEdges));
     }
+
+    @Override
+    public void readState(DataInput in) throws IOException {
+    }
+
+    @Override
+    public void writeState(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public Text createVertexIDObject() {
+      return new Text();
+    }
+
+    @Override
+    public NullWritable createEdgeCostObject() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public DoubleWritable createVertexValue() {
+      return new DoubleWritable();
+    }
+
   }
 
   private static void printUsage() {
@@ -109,6 +165,7 @@ public class PageRank {
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
     GraphJob pageJob = createJob(args, conf);
+    pageJob.setVertexInputReaderClass(PagerankTextReader.class);
 
     long startTime = System.currentTimeMillis();
     if (pageJob.waitForCompletion(true)) {
@@ -145,8 +202,6 @@ public class PageRank {
     pageJob.setEdgeValueClass(NullWritable.class);
 
     pageJob.setInputFormat(SequenceFileInputFormat.class);
-    pageJob.setInputKeyClass(Text.class);
-    pageJob.setInputValueClass(TextArrayWritable.class);
 
     pageJob.setPartitioner(HashPartitioner.class);
     pageJob.setOutputFormat(TextOutputFormat.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -70,6 +72,29 @@ public class SSSP {
         voteToHalt();
       }
     }
+
+    @Override
+    public void readState(DataInput in) throws IOException {}
+
+    @Override
+    public void writeState(DataOutput out) throws IOException {}
+
+    @Override
+    public Text createVertexIDObject() {
+      return new Text();
+    }
+
+    @Override
+    public IntWritable createEdgeCostObject() {
+      return new IntWritable();
+    }
+
+    @Override
+    public IntWritable createVertexValue() {
+      return new IntWritable();
+    }
+
+    
   }
 
   public static class MinIntCombiner extends Combiner<IntWritable> {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java Sun Jan 13 20:45:35 2013
@@ -151,6 +151,8 @@ public class SpMV {
      * Output is pairs of integer and double
      */
     bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setInputKeyClass(IntWritable.class);
+    bsp.setInputValueClass(SparseVectorWritable.class); 
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(DoubleWritable.class);
     bsp.setOutputFormat(SequenceFileOutputFormat.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Sun Jan 13 20:45:35 2013
@@ -28,36 +28,34 @@ import com.google.common.base.Objects;
 
 /**
  * TextPair class for use in BipartiteMatching algorithm.
- *
+ * 
  */
-public final class TextPair implements Writable{
-  
+public final class TextPair implements Writable {
+
   Text first;
   Text second;
-  
+
   String nameFirst = "First";
   String nameSecond = "Second";
-  
-  public TextPair(){
-    first  = new Text();
-    second = new Text(); 
-  }
-  
-  public TextPair(Text first, Text second){
-    this.first  = first;
+
+  public TextPair() {
+    first = new Text();
+    second = new Text();
+  }
+
+  public TextPair(Text first, Text second) {
+    this.first = first;
     this.second = second;
   }
-  
+
   /**
-   * Sets the names of the attributes 
+   * Sets the names of the attributes
    */
-  public TextPair setNames(String nameFirst, String nameSecond){
+  public TextPair setNames(String nameFirst, String nameSecond) {
     this.nameFirst = nameFirst;
     this.nameSecond = nameSecond;
     return this;
   }
-  
-  
 
   public Text getFirst() {
     return first;
@@ -77,23 +75,29 @@ public final class TextPair implements W
 
   @Override
   public void write(DataOutput out) throws IOException {
+    (new Text(nameFirst)).write(out);
+    (new Text(nameSecond)).write(out);
     first.write(out);
     second.write(out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
+
+    Text t1 = new Text();
+    Text t2 = new Text();
+    t1.readFields(in);
+    t2.readFields(in);
+    nameFirst = t1.toString();
+    nameSecond = t2.toString();
     first.readFields(in);
     second.readFields(in);
   }
-  
+
   @Override
-  public String toString(){
-    return Objects.toStringHelper(this)
-        .add(nameFirst, getFirst())
-        .add(nameSecond, getSecond())
-        .toString();
+  public String toString() {
+    return Objects.toStringHelper(this).add(nameFirst, getFirst())
+        .add(nameSecond, getSecond()).toString();
   }
 
-
 }

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java?rev=1432733&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java Sun Jan 13 20:45:35 2013
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.NullInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.examples.CombineExample;
+import org.apache.hama.examples.PageRank.PageRankVertex;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.util.ReflectionUtils;
+
+public class VertexInputGen {
+
+  public static final String SIZE_OF_MATRIX = "size.of.matrix";
+  public static final String DENSITY = "density.of.matrix";
+
+  public static interface VertexCreator {
+    @SuppressWarnings("rawtypes")
+    Vertex createVertex(Text id, Text[] edges, Text value);
+  }
+
+  public static class PageRankVertexCreatorImpl implements VertexCreator {
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public Vertex createVertex(Text id, Text[] edges, Text value) {
+      Vertex v = new PageRankVertex();
+      v.setVertexID(id);
+      for (Text t : edges) {
+        v.addEdge(new Edge<Text, NullWritable>(t, null));
+      }
+      return v;
+    }
+
+  }
+
+  public static int getVertexCaseId(Class<? extends Vertex> classObj) {
+    if (classObj.getCanonicalName().equals(
+        PageRankVertexCreatorImpl.class.getCanonicalName())) {
+      return 1;
+    }
+
+    return -1;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static class VertexInputGenBSP extends
+      BSP<NullWritable, NullWritable, Vertex, NullWritable, Text> {
+
+    private Configuration conf;
+    private int sizeN;
+    private int density;
+    private Map<Integer, HashSet<Integer>> list = new HashMap<Integer, HashSet<Integer>>();
+    private VertexCreator vertexCreator;
+
+    @Override
+    public void setup(
+        BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer) {
+      this.conf = peer.getConfiguration();
+      sizeN = conf.getInt(SIZE_OF_MATRIX, 10);
+      density = conf.getInt(DENSITY, 1);
+
+      int vertexCase = conf.getInt("hama.test.vertexcreatorid", -1);
+      if (vertexCase == 1) {
+        vertexCreator = new PageRankVertexCreatorImpl();
+      } else {
+        throw new RuntimeException("No vertex creator specified");
+      }
+
+    }
+
+    @Override
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer)
+        throws IOException, SyncException, InterruptedException {
+      int interval = sizeN / peer.getNumPeers();
+      int startID = peer.getPeerIndex() * interval;
+      int endID;
+      if (peer.getPeerIndex() == peer.getNumPeers() - 1)
+        endID = sizeN;
+      else
+        endID = startID + interval;
+
+      // Generate N*(N+1) elements for lower triangular
+      for (int i = startID; i < endID; i++) {
+        HashSet<Integer> edges = new HashSet<Integer>();
+        for (int j = 0; j <= i; j++) {
+          boolean nonZero = new Random().nextInt(density) == 0;
+          if (nonZero && !edges.contains(j) && i != j) {
+            edges.add(j);
+
+            // TODO please refactor this.
+            int peerIndex = j / interval;
+            if (peerIndex == peer.getNumPeers())
+              peerIndex = peerIndex - 1;
+
+            peer.send(peer.getPeerName(j / interval), new Text(j + "," + i));
+          }
+        }
+
+        list.put(i, edges);
+      }
+
+      // Synchronize the upper and lower
+      peer.sync();
+      Text received;
+      while ((received = peer.getCurrentMessage()) != null) {
+        String[] kv = received.toString().split(",");
+        HashSet<Integer> nList = list.get(Integer.parseInt(kv[0]));
+        nList.add(Integer.parseInt(kv[1]));
+        list.put(Integer.parseInt(kv[0]), nList);
+      }
+    }
+
+    @Override
+    public void cleanup(
+        BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer)
+        throws IOException {
+      for (Map.Entry<Integer, HashSet<Integer>> e : list.entrySet()) {
+        Text[] values = new Text[e.getValue().size()];
+        if (values.length > 0) {
+          int i = 0;
+          for (Integer v : e.getValue()) {
+            values[i] = new Text(String.valueOf(v));
+            i++;
+          }
+          peer.write(
+              (Vertex)this.vertexCreator.createVertex(
+                  new Text(String.valueOf(e.getKey())), values, new Text()),
+              NullWritable.get());
+        }
+      }
+    }
+  }
+
+  public static void runJob(HamaConfiguration conf, int numTasks, String output, Class<? extends Vertex> cls)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    BSPJob bsp = new BSPJob(conf, VertexInputGen.class);
+    // Set the job name
+    bsp.setJobName("Random Vertex Input Generator");
+    bsp.setBspClass(VertexInputGenBSP.class);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputKeyClass(cls);
+    bsp.setOutputValueClass(NullWritable.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(bsp, new Path(output));
+    bsp.setNumBspTask(numTasks);
+
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException, ClassNotFoundException {
+    if (args.length < 4) {
+      System.out
+          .println("Usage: <size n> <1/x density> <output path> <number of tasks>");
+      System.exit(1);
+    }
+
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+
+    conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0]));
+    conf.setInt(DENSITY, Integer.parseInt(args[1]));
+    runJob(conf, Integer.parseInt(args[3]), args[2], Vertex.class);
+
+  }
+
+}

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Sun Jan 13 20:45:35 2013
@@ -26,6 +26,8 @@ import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
 
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -33,30 +35,36 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.examples.util.TextPair;
+import org.apache.hama.graph.GraphJob;
 import org.junit.Test;
 
-import junit.framework.TestCase;
+public class BipartiteMatchingTest extends TestCase {
 
-public class BipartiteMatchingTest extends TestCase{
-
-  private String[] input = {
-      "A L:B D",
-      "B R:A C",
-      "C L:B D",
-      "D R:A C"
-  };
+  private String[] input = { "A L:B D", "B R:A C", "C L:B D", "D R:A C" };
 
   private final static String DELIMETER = "\t";
 
   @SuppressWarnings("serial")
-  private Map<String, String> output1 = new HashMap<String, String>()
-  {{
-    put("C", "TextPair{MatchVertex=D, Component=L}");
-    put("A", "TextPair{MatchVertex=B, Component=L}");
-    put("D", "TextPair{MatchVertex=C, Component=R}");
-    put("B", "TextPair{MatchVertex=A, Component=R}");
-  }};
+  private Map<String, String> output1 = new HashMap<String, String>() {
+    {
+      put("C", "TextPair{MatchVertex=D, Component=L}");
+      put("A", "TextPair{MatchVertex=B, Component=L}");
+      put("D", "TextPair{MatchVertex=C, Component=R}");
+      put("B", "TextPair{MatchVertex=A, Component=R}");
+    }
+  };
 
+  public static class CustomTextPartitioner implements
+      Partitioner<Text, TextPair> {
+
+    @Override
+    public int getPartition(Text key, TextPair value, int numTasks) {
+      return Character.getNumericValue(key.toString().charAt(0)) % numTasks;
+    }
+
+  }
 
   private static String INPUT = "/tmp/graph.txt";
   private static String OUTPUT = "/tmp/graph-bipartite";
@@ -70,57 +78,63 @@ public class BipartiteMatchingTest exten
     fs = FileSystem.get(conf);
   }
 
-  private void generateTestData(){
+  private void generateTestData() {
     FileWriter fout = null;
     BufferedWriter bout = null;
     PrintWriter pout = null;
-    try{
+    try {
       fout = new FileWriter(INPUT);
       bout = new BufferedWriter(fout);
       pout = new PrintWriter(bout);
-      for(String line:input){
+      for (String line : input) {
         pout.println(line);
       }
-    }
-    catch(IOException e){
+    } catch (IOException e) {
       e.printStackTrace();
-    }
-    finally{      
+    } finally {
       try {
-        if(pout!=null){pout.close();}
-        if(bout!=null){bout.close();}
-        if(fout!=null){fout.close();}
+        if (pout != null) {
+          pout.close();
+        }
+        if (bout != null) {
+          bout.close();
+        }
+        if (fout != null) {
+          fout.close();
+        }
       } catch (IOException e) {
         e.printStackTrace();
-      }      
+      }
     }
   }
 
-
-  private void verifyResult()throws IOException{
+  private void verifyResult() throws IOException {
     FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    assertTrue(files.length == 2);
+
     Text key = new Text();
     Text value = new Text();
-    for(FileStatus file:files){
-      if(file.getLen() > 0){
-        FSDataInputStream in = fs.open(file.getPath());        
-        BufferedReader bin = new BufferedReader(
-            new InputStreamReader(in));
+
+    for (FileStatus file : files) {
+      if (file.getLen() > 0) {
+        FSDataInputStream in = fs.open(file.getPath());
+        BufferedReader bin = new BufferedReader(new InputStreamReader(in));
 
         String s = bin.readLine();
-        while(s!=null){
+        while (s != null) {
           next(key, value, s);
           String expValue = output1.get(key.toString());
+          System.out.println(key + " " + value + " expvalue = " + expValue);
           assertEquals(expValue, value.toString());
-          System.out.println(key + " "+value);
+
           s = bin.readLine();
-        }        
+        }
         in.close();
       }
     }
   }
 
-  private static void next(Text key, Text value, String line){
+  private static void next(Text key, Text value, String line) {
     String[] lineA = line.split(DELIMETER);
     key.set(lineA[0]);
     value.set(lineA[1]);
@@ -139,17 +153,24 @@ public class BipartiteMatchingTest exten
 
   @Test
   public void testBipartiteMatching() throws IOException, InterruptedException,
-  ClassNotFoundException{    
+      ClassNotFoundException {
     generateTestData();
     try {
       String seed = "2";
-      BipartiteMatching.main(new String[] { INPUT, OUTPUT, "30", "2",
-          seed});
+      HamaConfiguration conf = new HamaConfiguration();
+      GraphJob job = BipartiteMatching.createJob(new String[] { INPUT, OUTPUT,
+          "30", "2", seed }, conf);
+      job.setPartitioner(CustomTextPartitioner.class);
+
+      long startTime = System.currentTimeMillis();
+      if (job.waitForCompletion(true)) {
+        System.out.println("Job Finished in "
+            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+      }
+
       verifyResult();
     } finally {
       deleteTempDirs();
     }
   }
-
-
 }

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sun Jan 13 20:45:35 2013
@@ -29,12 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.examples.MindistSearch.MinTextCombiner;
+import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
+import org.apache.hama.graph.Edge;
 
 public class MindistSearchTest extends TestCase {
 
@@ -98,18 +99,18 @@ public class MindistSearchTest extends T
   private void generateTestData() {
     try {
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-          new Path(INPUT), Text.class, TextArrayWritable.class);
+          new Path(INPUT), MindistSearchVertex.class, NullWritable.class);
 
       for (int i = 0; i < input.length; i++) {
         String[] x = input[i].split("\t");
         Text key = new Text(x[0]);
-        Writable[] values = new Writable[x.length - 1];
+        MindistSearchVertex vertex = new MindistSearchVertex();
+        vertex.setVertexID(key);
         for (int j = 1; j < x.length; j++) {
-          values[j - 1] = new Text(x[j]);
+          vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+              NullWritable.get()));
         }
-        TextArrayWritable value = new TextArrayWritable();
-        value.set(values);
-        writer.append(key, value);
+        writer.append(vertex, NullWritable.get());
       }
 
       writer.close();

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sun Jan 13 20:45:35 2013
@@ -28,14 +28,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.examples.util.SymmetricMatrixGen;
+import org.apache.hama.examples.PageRank.PageRankVertex;
+import org.apache.hama.examples.util.VertexInputGen;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner;
 
 public class PageRankTest extends TestCase {
+
   private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
   private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
   private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
+
   private static String OUTPUT = "/tmp/pagerank/pagerank-out";
   private Configuration conf = new HamaConfiguration();
   private FileSystem fs;
@@ -70,8 +73,8 @@ public class PageRankTest extends TestCa
       conf.set("bsp.local.tasks.maximum", "10");
       conf.set("bsp.peers.num", "7");
       conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
-      GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7" },
-          conf);
+      GraphJob pageJob = PageRank.createJob(
+          new String[] { INPUT, OUTPUT, "7" }, conf);
 
       if (!pageJob.waitForCompletion(true)) {
         fail("Job did not complete normally!");
@@ -84,7 +87,11 @@ public class PageRankTest extends TestCa
 
   private void generateTestData() throws ClassNotFoundException,
       InterruptedException, IOException {
-    SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" });
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.setInt(VertexInputGen.SIZE_OF_MATRIX, 40);
+    conf.setInt(VertexInputGen.DENSITY, 10);
+    conf.setInt("hama.test.vertexcreatorid", 1);
+    VertexInputGen.runJob(conf, 3, INPUT, PageRankVertex.class);
   }
 
   private void deleteTempDirs() {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Sun Jan 13 20:45:35 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.Writable;
  * The edge class
  */
 public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
-
   private final VERTEX_ID destinationVertexID;
   private final EDGE_VALUE_TYPE cost;
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sun Jan 13 20:45:35 2013
@@ -20,13 +20,16 @@ package org.apache.hama.graph;
 import java.io.IOException;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 
 import com.google.common.base.Preconditions;
 
@@ -39,9 +42,6 @@ public class GraphJob extends BSPJob {
 
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
-  public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
-  public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class";
-
   /**
    * Creates a new Graph Job with the given configuration and an exampleClass.
    * The exampleClass is used to determine the user's jar to distribute in the
@@ -67,6 +67,8 @@ public class GraphJob extends BSPJob {
       Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
       throws IllegalStateException {
     conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
+    setInputKeyClass(cls);
+    setInputValueClass(NullWritable.class);
   }
 
   /**
@@ -119,7 +121,9 @@ public class GraphJob extends BSPJob {
   public void setVertexInputReaderClass(
       Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
     ensureState(JobState.DEFINE);
-    conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class);
+    conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
+        RecordConverter.class);
+    conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
 
   @SuppressWarnings("unchecked")
@@ -132,7 +136,7 @@ public class GraphJob extends BSPJob {
   public void setPartitioner(@SuppressWarnings("rawtypes")
   Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
-    conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
+    conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
 
   @Override

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun Jan 13 20:45:35 2013
@@ -28,7 +28,6 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
@@ -66,13 +65,13 @@ public final class GraphJobRunner<V exte
 
   public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
   public static final String GRAPH_REPAIR = "hama.graph.repair";
+  public static final String VERTEX_CLASS = "hama.graph.vertex.class";
 
   private Configuration conf;
   private Combiner<M> combiner;
   private Partitioner<V, M> partitioner;
 
-  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>();
-
+  private VerticesInfo<V, E, M> vertices;
   private boolean updated = true;
   private int globalUpdateCounts = 0;
 
@@ -264,10 +263,12 @@ public final class GraphJobRunner<V exte
 
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
+
+    vertices = new VerticesInfo<V, E, M>();
   }
 
   /**
-   * Loads vertices into memory of each peer. TODO this needs to be simplified.
+   * Loads vertices into memory of each peer.
    */
   @SuppressWarnings("unchecked")
   private void loadVertices(
@@ -277,41 +278,22 @@ public final class GraphJobRunner<V exte
 
     final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
 
-    LOG.debug("vertex class: " + vertexClass);
-    Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
-    vertex.runner = this;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Vertex class: " + vertexClass);
 
     KeyValuePair<Writable, Writable> next = null;
     while ((next = peer.readNext()) != null) {
-      V key = (V) next.getKey();
-      Writable[] edges = ((ArrayWritable) next.getValue()).get();
-      vertex.setVertexID(key);
-      List<Edge<V, E>> edgeList = new ArrayList<Edge<V, E>>();
-      for (Writable edge : edges) {
-        edgeList.add(new Edge<V, E>((V) edge, null));
-      }
-      vertex.setEdges(edgeList);
-
-      if (vertex.getEdges() == null) {
-        if (selfReference) {
-          vertex.setEdges(Collections.singletonList(new Edge<V, E>(vertex
-              .getVertexID(), null)));
-        } else {
-          vertex.setEdges(Collections.EMPTY_LIST);
-        }
-      }
-
+      Vertex<V, E, M> vertex = (Vertex<V, E, M>) next.getKey();
+      vertex.runner = this;
+      vertex.setup(conf);
+      vertices.addVertex(vertex);
       if (selfReference) {
         vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
-
-      vertex.setup(conf);
-      vertices.add(vertex);
-      vertex = newVertexInstance(vertexClass, conf);
-      vertex.runner = this;
     }
 
-    LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
+    if (LOG.isDebugEnabled())
+      LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
 
     /*
      * If the user want to repair the graph, it should traverse through that
@@ -321,18 +303,20 @@ public final class GraphJobRunner<V exte
      * procedure is to prevent NullPointerExceptions from happening.
      */
     if (repairNeeded) {
-      LOG.debug("Starting repair of this graph!");
+      if (LOG.isDebugEnabled())
+        LOG.debug("Starting repair of this graph!");
       repair(peer, selfReference);
     }
 
-    LOG.debug("Starting Vertex processing!");
+    if (LOG.isDebugEnabled())
+      LOG.debug("Starting Vertex processing!");
   }
 
   @SuppressWarnings("unchecked")
   private void repair(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      boolean selfReference) throws IOException,
-      SyncException, InterruptedException {
+      boolean selfReference) throws IOException, SyncException,
+      InterruptedException {
 
     Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
 
@@ -368,7 +352,9 @@ public final class GraphJobRunner<V exte
       }
     }
 
-    vertices.addAll(tmp.values());
+    for (Vertex<V, E, M> v : tmp.values()) {
+      vertices.addVertex(v);
+    }
     tmp.clear();
   }
 
@@ -533,4 +519,3 @@ public final class GraphJobRunner<V exte
   }
 
 }
-

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sun Jan 13 20:45:35 2013
@@ -17,8 +17,10 @@
  */
 package org.apache.hama.graph;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,8 +29,24 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Partitioner;
 
+/**
+ * Vertex is a abstract definition of Google Pregel Vertex. For implementing a
+ * graph application, one must implement a sub-class of Vertex and define, the
+ * message passing and message processing for each vertex.
+ * 
+ * Every vertex should be assigned an ID. This ID object should obey the
+ * equals-hashcode contract and would be used for partitioning.
+ * 
+ * The edges for a vertex could be accessed and modified using the
+ * {@link Vertex#getEdges()} call. The self value of the vertex could be changed
+ * by {@link Vertex#setValue(Writable)}.
+ * 
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
 public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
-    implements VertexInterface<V, E, M> {
+    implements VertexInterface<V, E, M>, Writable {
 
   GraphJobRunner<?, ?, ?> runner;
 
@@ -74,7 +92,7 @@ public abstract class Vertex<V extends W
         getPartitioner().getPartition(vertexId, value,
             runner.getPeer().getNumPeers()));
   }
-
+  
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
@@ -103,7 +121,7 @@ public abstract class Vertex<V extends W
 
   public void addEdge(Edge<V, E> edge) {
     if (edges == null) {
-      this.edges = new ArrayList<Edge<V, E>>(1);
+      this.edges = new LinkedList<Edge<V, E>>();
     }
     this.edges.add(edge);
   }
@@ -195,10 +213,7 @@ public abstract class Vertex<V extends W
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((vertexID == null) ? 0 : vertexID.hashCode());
-    return result;
+    return ((vertexID == null) ? 0 : vertexID.hashCode());
   }
 
   @Override
@@ -224,4 +239,120 @@ public abstract class Vertex<V extends W
         + " // " + edges;
   }
 
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      if (vertexID == null) {
+        vertexID = createVertexIDObject();
+      }
+      vertexID.readFields(in);
+    }
+    if (in.readBoolean()) {
+      if (this.value == null) {
+        value = createVertexValue();
+      }
+      value.readFields(in);
+    }
+    this.edges = new LinkedList<Edge<V, E>>();
+    if (in.readBoolean()) {
+      int num = in.readInt();
+      if (num > 0) {
+        for (int i = 0; i < num; ++i) {
+          V vertex = createVertexIDObject();
+          vertex.readFields(in);
+          E edgeCost = null;
+          if (in.readBoolean()) {
+            edgeCost = this.createEdgeCostObject();
+            edgeCost.readFields(in);
+          }
+          Edge<V, E> edge = new Edge<V, E>(vertex, edgeCost);
+          this.edges.add(edge);
+        }
+
+      }
+    }
+    votedToHalt = in.readBoolean();
+    readState(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (vertexID == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      vertexID.write(out);
+    }
+    if (value == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      value.write(out);
+    }
+    if (this.edges == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(this.edges.size());
+      for (Edge<V, E> edge : this.edges) {
+        edge.getDestinationVertexID().write(out);
+        if (edge.getValue() == null) {
+          out.writeBoolean(false);
+        } else {
+          out.writeBoolean(true);
+          edge.getValue().write(out);
+        }
+      }
+    }
+    out.writeBoolean(votedToHalt);
+    writeState(out);
+
+  }
+
+  /**
+   * Create the vertex id object. This function is used by the framework to
+   * construct the vertex id object.
+   * 
+   * @return instance of V
+   */
+  public abstract V createVertexIDObject();
+
+  /**
+   * Create the Edge cost object. This function is used by the framework to
+   * construct the edge cost object
+   * 
+   * @return instance of E
+   */
+  public abstract E createEdgeCostObject();
+
+  /**
+   * Create the vertex value object. This function is used by the framework to
+   * construct the vertex value object.
+   * 
+   * @return
+   */
+  public abstract M createVertexValue();
+
+  /**
+   * Read the state of the vertex from the input stream. The framework would
+   * have already constructed and loaded the vertex-id, edges and voteToHalt
+   * state. This function is essential if there is any more properties of vertex
+   * to be read from.
+   * 
+   * @param in
+   * @throws IOException
+   */
+  public abstract void readState(DataInput in) throws IOException;
+
+  /**
+   * Writes the state of vertex to the output stream. The framework writes the
+   * vertex and edge information to the output stream. This function could be
+   * used to save the state variable of the vertex added in the implementation
+   * of object.
+   * 
+   * @param out
+   * @throws IOException
+   */
+  public abstract void writeState(DataOutput out) throws IOException;
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Sun Jan 13 20:45:35 2013
@@ -17,12 +17,25 @@
  */
 package org.apache.hama.graph;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
+import org.apache.hama.util.KeyValuePair;
 
 /**
  * A reader to read Hama's input files and parses a vertex out of it.
  */
-public abstract class VertexInputReader<KEYIN extends Writable, VALUEIN extends Writable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputReader<KEYIN extends Writable, VALUEIN extends Writable, V extends Writable, E extends Writable, M extends Writable>
+    implements RecordConverter {
+
+  private static final Log LOG = LogFactory.getLog(VertexInputReader.class);
+
+  private KeyValuePair<Writable, Writable> outputRecord = new KeyValuePair<Writable, Writable>();
 
   /**
    * Parses a given key and value into the given vertex. If returned true, the
@@ -32,4 +45,40 @@ public abstract class VertexInputReader<
   public abstract boolean parseVertex(KEYIN key, VALUEIN value,
       Vertex<V, E, M> vertex) throws Exception;
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public KeyValuePair<Writable, Writable> convertRecord(
+      KeyValuePair<Writable, Writable> inputRecord, Configuration conf) {
+    Class<Vertex<V, E, M>> vertexClass = (Class<Vertex<V, E, M>>) conf
+        .getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class);
+    boolean vertexCreation = true;
+    Vertex<V, E, M> vertex = GraphJobRunner
+        .newVertexInstance(vertexClass, conf);
+    try {
+      vertexCreation = parseVertex((KEYIN) inputRecord.getKey(),
+          (VALUEIN) inputRecord.getValue(), vertex);
+    } catch (Exception e) {
+      LOG.error("Error parsing vertex.", e);
+      vertexCreation = false;
+    }
+    if (!vertexCreation) {
+      return null;
+    }
+    outputRecord.setKey(vertex);
+    outputRecord.setValue(NullWritable.get());
+    return outputRecord;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
+      @SuppressWarnings("rawtypes")
+      Partitioner partitioner, Configuration conf,
+      @SuppressWarnings("rawtypes")
+      BSPPeer peer, int numTasks) {
+    Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getKey();
+    return Math.abs(partitioner.getPartition(vertex.getVertexID(),
+        vertex.getValue(), numTasks));
+  }
+
 }

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1432733&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Sun Jan 13 20:45:35 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VerticesInfo encapsulates the storage of vertices in a BSP Task.
+ * 
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public class VerticesInfo<V extends Writable, E extends Writable, M extends Writable>
+    implements Iterable<Vertex<V, E, M>> {
+
+  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>(100);
+
+  public void addVertex(Vertex<V, E, M> vertex) {
+    int i = 0;
+    for (Vertex<V, E, M> check : this) {
+      if (check.getVertexID().equals(vertex.getVertexID())) {
+        this.vertices.set(i, vertex);
+        return;
+      }
+      ++i;
+    }
+    vertices.add(vertex);
+  }
+
+  public Vertex<V, E, M> getVertex(V vertexId) {
+    for (Vertex<V, E, M> vertex : this) {
+      if (vertex.getVertexID().equals(vertexId)) {
+        return vertex;
+      }
+    }
+    return null;
+  }
+
+  public boolean containsVertex(V vertexId) {
+    for (Vertex<V, E, M> vertex : this) {
+      if (vertex.getVertexID().equals(vertexId)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void clear() {
+    vertices.clear();
+  }
+
+  public int size() {
+    return this.vertices.size();
+  }
+
+  @Override
+  public Iterator<Vertex<V, E, M>> iterator() {
+    return vertices.iterator();
+  }
+
+  public void recoverState(DataInput in) {
+
+  }
+
+  public void saveState(DataOutput out) {
+
+  }
+}

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sun Jan 13 20:45:35 2013
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.ClusterStatus;
@@ -33,8 +32,8 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.graph.example.PageRank;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
 
 public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
 
@@ -57,8 +56,10 @@ public class TestSubmitGraphJob extends 
     // Set multi-step partitioning interval to 30 bytes
     configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
 
+    configuration.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+
     GraphJob bsp = new GraphJob(configuration, PageRank.class);
-    bsp.setInputPath(new Path("/tmp/pagerank"));
+    bsp.setInputPath(new Path("/tmp/pagerank/real-tmp.seq"));
     bsp.setOutputPath(new Path(OUTPUT));
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
@@ -75,10 +76,7 @@ public class TestSubmitGraphJob extends 
     bsp.setAggregatorClass(AverageAggregator.class,
         PageRank.DanglingNodeAggregator.class);
 
-    bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
     bsp.setInputFormat(SequenceFileInputFormat.class);
-    bsp.setInputKeyClass(Text.class);
-    bsp.setInputValueClass(TextArrayWritable.class);
     
     bsp.setVertexIDClass(Text.class);
     bsp.setVertexValueClass(DoubleWritable.class);
@@ -124,18 +122,18 @@ public class TestSubmitGraphJob extends 
   private void generateTestData() {
     try {
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
-          new Path(INPUT), Text.class, TextArrayWritable.class);
+          new Path(INPUT), PageRankVertex.class, NullWritable.class);
 
       for (int i = 0; i < input.length; i++) {
         String[] x = input[i].split("\t");
-        Text key = new Text(x[0]);
-        Writable[] values = new Writable[x.length - 1];
+
+        PageRankVertex vertex = new PageRankVertex();
+        vertex.setVertexID(new Text(x[0]));
         for (int j = 1; j < x.length; j++) {
-          values[j - 1] = new Text(x[j]);
+          vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+              NullWritable.get()));
         }
-        TextArrayWritable value = new TextArrayWritable();
-        value.set(values);
-        writer.append(key, value);
+        writer.append(vertex, NullWritable.get());
       }
 
       writer.close();

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph.example;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -25,6 +27,7 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.Vertex;
@@ -87,10 +90,33 @@ public class PageRank {
       sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
           / numEdges));
     }
+
+    @Override
+    public Text createVertexIDObject() {
+      return new Text();
+    }
+
+    @Override
+    public NullWritable createEdgeCostObject() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public DoubleWritable createVertexValue() {
+      return new DoubleWritable();
+    }
+
+    @Override
+    public void readState(DataInput in) throws IOException {}
+
+    @Override
+    public void writeState(DataOutput out) throws IOException {}
+
   }
 
   public static class PagerankTextReader extends
-      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable>
+      implements RecordConverter {
 
     /**
      * The text file essentially should look like: <br/>