You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/22 07:25:50 UTC

svn commit: r1341306 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/o...

Author: tjungblut
Date: Tue May 22 05:25:49 2012
New Revision: 1341306

URL: http://svn.apache.org/viewvc?rev=1341306&view=rev
Log:
Move partitioner to job for graph jobs

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue May 22 05:25:49 2012
@@ -90,9 +90,9 @@ public class BSPJob extends BSPJobContex
   // /////////////////////////////////////
   // Setter for Job Submission
   // /////////////////////////////////////
-  public void setWorkingDirectory(Path dir) throws IOException {
+  public void setWorkingDirectory(Path pDir) throws IOException {
     ensureState(JobState.DEFINE);
-    dir = new Path(getWorkingDirectory(), dir);
+    Path dir = new Path(getWorkingDirectory(), pDir);
     conf.set(WORKING_DIR, dir.toString());
   }
 
@@ -253,7 +253,7 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings({ "rawtypes" })
   public InputFormat getInputFormat() {
-    return (InputFormat) ReflectionUtils.newInstance(conf.getClass(
+    return ReflectionUtils.newInstance(conf.getClass(
         "bsp.input.format.class", TextInputFormat.class, InputFormat.class),
         conf);
   }
@@ -380,14 +380,14 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return (Partitioner) ReflectionUtils.newInstance(conf
+    return ReflectionUtils.newInstance(conf
         .getClass("bsp.input.partitioner.class", HashPartitioner.class,
             Partitioner.class), conf);
   }
 
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
-    return (OutputFormat) ReflectionUtils.newInstance(conf.getClass(
+    return ReflectionUtils.newInstance(conf.getClass(
         "bsp.output.format.class", TextOutputFormat.class, OutputFormat.class),
         conf);
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue May 22 05:25:49 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 
 /**
@@ -140,6 +141,7 @@ public class BSPJobClient extends Config
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
+    @Override
     public synchronized boolean isComplete() throws IOException {
       updateStatus();
       return (status.getRunState() == JobStatus.SUCCEEDED
@@ -149,11 +151,13 @@ public class BSPJobClient extends Config
     /**
      * True if job completed successfully.
      */
+    @Override
     public synchronized boolean isSuccessful() throws IOException {
       updateStatus();
       return status.getRunState() == JobStatus.SUCCEEDED;
     }
 
+    @Override
     public synchronized long getSuperstepCount() throws IOException {
       ensureFreshStatus();
       return status.getSuperstepCount();
@@ -162,6 +166,7 @@ public class BSPJobClient extends Config
     /**
      * Blocks until the job is finished
      */
+    @Override
     public void waitForCompletion() throws IOException {
       while (!isComplete()) {
         try {
@@ -174,6 +179,7 @@ public class BSPJobClient extends Config
     /**
      * Tells the service to get the state of the current job.
      */
+    @Override
     public synchronized int getJobState() throws IOException {
       updateStatus();
       return status.getRunState();
@@ -187,6 +193,7 @@ public class BSPJobClient extends Config
     /**
      * Tells the service to terminate the current job.
      */
+    @Override
     public synchronized void killJob() throws IOException {
       jobSubmitClient.killJob(getID());
     }
@@ -210,7 +217,7 @@ public class BSPJobClient extends Config
     String masterAdress = conf.get("bsp.master.address");
     if (masterAdress != null && !masterAdress.equals("local")) {
       this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
-          JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+          JobSubmissionProtocol.class, HamaRPCProtocolVersion.versionID,
           BSPMaster.getAddress(conf), conf,
           NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
     } else {
@@ -277,8 +284,9 @@ public class BSPJobClient extends Config
 
   static Random r = new Random();
 
-  public RunningJob submitJobInternal(BSPJob job, BSPJobID jobId)
+  public RunningJob submitJobInternal(BSPJob pJob, BSPJobID jobId)
       throws IOException {
+    BSPJob job = pJob;
     job.setJobID(jobId);
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
@@ -308,7 +316,9 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-      if (job.getConf().get("bsp.input.partitioner.class") != null) {
+      if (job.getConf().get("bsp.input.partitioner.class") != null
+          && !job.getConf()
+              .getBoolean("hama.graph.runtime.partitioning", false)) {
         job = partition(job, maxTasks);
         maxTasks = job.getInt("hama.partition.count", maxTasks);
       }
@@ -450,11 +460,11 @@ public class BSPJobClient extends Config
     return job;
   }
 
-  private boolean isProperSize(int numBspTask, int maxTasks) {
+  private static boolean isProperSize(int numBspTask, int maxTasks) {
     return (numBspTask > 1 && numBspTask < maxTasks);
   }
 
-  private String getPartitionName(int i) {
+  private static String getPartitionName(int i) {
     return "part-" + String.valueOf(100000 + i).substring(1, 6);
   }
 
@@ -499,7 +509,7 @@ public class BSPJobClient extends Config
     return codecClass;
   }
 
-  private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
+  private static int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
       throws IOException {
     InputSplit[] splits = job.getInputFormat().getSplits(
         job,
@@ -529,7 +539,7 @@ public class BSPJobClient extends Config
   private static final int CURRENT_SPLIT_FILE_VERSION = 0;
   private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
 
-  private DataOutputStream writeSplitsFileHeader(Configuration conf,
+  private static DataOutputStream writeSplitsFileHeader(Configuration conf,
       Path filename, int length) throws IOException {
     // write the splits to a file for the bsp master
     FileSystem fs = filename.getFileSystem(conf);
@@ -817,7 +827,7 @@ public class BSPJobClient extends Config
   /**
    * Display usage of the command-line tool and terminate execution
    */
-  private void displayUsage(String cmd) {
+  private static void displayUsage(String cmd) {
     String prefix = "Usage: hama job ";
     String taskStates = "running, completed";
     if ("-submit".equals(cmd)) {
@@ -986,6 +996,7 @@ public class BSPJobClient extends Config
       return locations;
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
       splitClass = Text.readString(in);
       dataLength = in.readLong();
@@ -997,6 +1008,7 @@ public class BSPJobClient extends Config
       }
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
       Text.writeString(out, splitClass);
       out.writeLong(dataLength);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue May 22 05:25:49 2012
@@ -76,10 +76,14 @@ public interface BSPPeer<K1, V1, K2, V2,
   public String getPeerName();
 
   /**
-   * @param index
    * @return the name of n-th peer from sorted array by name.
    */
   public String getPeerName(int index);
+  
+  /**
+   * @return the index of this peer from sorted array by name.
+   */
+  public int getPeerIndex();
 
   /**
    * @return the names of all the peers executing tasks from the same job

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue May 22 05:25:49 2012
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -196,6 +197,7 @@ public final class BSPPeerImpl<K1, V1, K
     final RecordWriter<K2, V2> finalOut = outWriter;
 
     collector = new OutputCollector<K2, V2>() {
+      @Override
       public void collect(K2 key, V2 value) throws IOException {
         finalOut.write(key, value);
       }
@@ -400,6 +402,7 @@ public final class BSPPeerImpl<K1, V1, K
   /**
    * @return the string as host:port of this Peer
    */
+  @Override
   public final String getPeerName() {
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
@@ -417,6 +420,13 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   @Override
+  public int getPeerIndex() {
+    initPeerNames();
+    return Arrays
+        .binarySearch(getAllPeerNames(), getPeerName());
+  }
+
+  @Override
   public final int getNumPeers() {
     initPeerNames();
     return allPeers.length;
@@ -448,6 +458,7 @@ public final class BSPPeerImpl<K1, V1, K
   /**
    * @return the count of current super-step
    */
+  @Override
   public final long getSuperstepCount() {
     return currentTaskStatus.getSuperstepCount();
   }
@@ -457,6 +468,7 @@ public final class BSPPeerImpl<K1, V1, K
    * 
    * @return the conf
    */
+  @Override
   public final Configuration getConfiguration() {
     return conf;
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue May 22 05:25:49 2012
@@ -49,6 +49,7 @@ public abstract class FileInputFormat<K,
 
   private long minSplitSize = 1;
   private static final PathFilter hiddenFileFilter = new PathFilter() {
+    @Override
     public boolean accept(Path p) {
       String name = p.getName();
       return !name.startsWith("_") && !name.startsWith(".");
@@ -71,6 +72,7 @@ public abstract class FileInputFormat<K,
       this.filters = filters;
     }
 
+    @Override
     public boolean accept(Path path) {
       for (PathFilter filter : filters) {
         if (!filter.accept(path)) {
@@ -90,6 +92,7 @@ public abstract class FileInputFormat<K,
     return true;
   }
 
+  @Override
   public abstract RecordReader<K, V> getRecordReader(InputSplit split,
       BSPJob job) throws IOException;
 
@@ -174,6 +177,7 @@ public abstract class FileInputFormat<K,
   /**
    * Splits files returned by {@link #listStatus(JobConf)} when they're too big.
    */
+  @Override
   public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
     FileStatus[] files = listStatus(job);
 
@@ -332,8 +336,8 @@ public abstract class FileInputFormat<K,
    * @param path {@link Path} to be added to the list of inputs for the
    *          map-reduce job.
    */
-  public static void addInputPath(BSPJob conf, Path path) {
-    path = new Path(conf.getWorkingDirectory(), path);
+  public static void addInputPath(BSPJob conf, Path p) {
+    Path path = new Path(conf.getWorkingDirectory(), p);
     String dirStr = StringUtils.escapeString(path.toString());
     String dirs = conf.get("bsp.input.dir");
     conf.set("bsp.input.dir", dirs == null ? dirStr : dirs
@@ -395,8 +399,9 @@ public abstract class FileInputFormat<K,
     return result;
   }
 
-  private void sortInDescendingOrder(List<NodeInfo> mylist) {
+  private static void sortInDescendingOrder(List<NodeInfo> mylist) {
     Collections.sort(mylist, new Comparator<NodeInfo>() {
+      @Override
       public int compare(NodeInfo obj1, NodeInfo obj2) {
 
         if (obj1 == null || obj2 == null)
@@ -424,8 +429,8 @@ public abstract class FileInputFormat<K,
    * @throws IOException
    */
   protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset,
-      long splitSize, NetworkTopology clusterMap) throws IOException {
-
+      long pSplitSize, NetworkTopology clusterMap) throws IOException {
+    long splitSize = pSplitSize;
     int startIndex = getBlockIndex(blkLocations, offset);
 
     long bytesInThisBlock = blkLocations[startIndex].getOffset()
@@ -519,7 +524,7 @@ public abstract class FileInputFormat<K,
     return identifyHosts(allTopos.length, racksMap);
   }
 
-  private String[] identifyHosts(int replicationFactor,
+  private static String[] identifyHosts(int replicationFactor,
       Map<Node, NodeInfo> racksMap) {
 
     String[] retVal = new String[replicationFactor];
@@ -562,7 +567,7 @@ public abstract class FileInputFormat<K,
     return retVal;
   }
 
-  private String[] fakeRacks(BlockLocation[] blkLocations, int index)
+  private static String[] fakeRacks(BlockLocation[] blkLocations, int index)
       throws IOException {
     String[] allHosts = blkLocations[index].getHosts();
     String[] allTopos = new String[allHosts.length];

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java Tue May 22 05:25:49 2012
@@ -71,10 +71,12 @@ public class FileSplit implements InputS
   }
 
   /** The number of bytes in the file to process. */
+  @Override
   public long getLength() {
     return length;
   }
 
+  @Override
   public String toString() {
     return file + ":" + start + "+" + length;
   }
@@ -83,12 +85,14 @@ public class FileSplit implements InputS
   // Writable methods
   // //////////////////////////////////////////
 
+  @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, file.toString());
     out.writeLong(start);
     out.writeLong(length);
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     file = new Path(Text.readString(in));
     start = in.readLong();
@@ -96,6 +100,7 @@ public class FileSplit implements InputS
     hosts = null;
   }
 
+  @Override
   public String[] getLocations() throws IOException {
     if (this.hosts == null) {
       return new String[] {};

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue May 22 05:25:49 2012
@@ -250,7 +250,7 @@ class TaskInProgress {
   private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
 
   public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
-    TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+    TaskStatus ts = taskStatuses.get(taskid);
     if ((ts != null) && (!tasksReportedClosed.contains(taskid))
         && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
       tasksReportedClosed.add(taskid);
@@ -263,7 +263,7 @@ class TaskInProgress {
   public void completed(TaskAttemptID taskid) {
     LOG.debug("Task '" + taskid.getTaskID().toString() + "' has completed.");
 
-    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    TaskStatus status = taskStatuses.get(taskid);
     status.setRunState(TaskStatus.State.SUCCEEDED);
     activeTasks.remove(taskid);
 
@@ -282,7 +282,7 @@ class TaskInProgress {
   public void terminated(TaskAttemptID taskid) {
     LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
 
-    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    TaskStatus status = taskStatuses.get(taskid);
     status.setRunState(TaskStatus.State.FAILED);
     activeTasks.remove(taskid);
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Tue May 22 05:25:49 2012
@@ -54,6 +54,8 @@ public abstract class HamaClusterTestCas
     this.zooKeeperCluster = new MiniZooKeeperCluster();
     int clientPort = this.zooKeeperCluster.startup(testDir);
     conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+    conf.set(Constants.GROOM_RPC_HOST, "localhost");
+    assertEquals(conf.get(Constants.GROOM_RPC_HOST), "localhost");
     bspCluster = new MiniBSPCluster(this.conf, numOfGroom); 
     bspCluster.startBSPCluster();
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Tue May 22 05:25:49 2012
@@ -118,8 +118,7 @@ public class CombineExample {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
 
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Tue May 22 05:25:49 2012
@@ -67,7 +67,7 @@ public class InlinkCount extends Vertex<
     inlinkJob.setInputFormat(SequenceFileInputFormat.class);
     inlinkJob.setInputKeyClass(VertexWritable.class);
     inlinkJob.setInputValueClass(VertexArrayWritable.class);
-    
+
     inlinkJob.setVertexIDClass(Text.class);
     inlinkJob.setVertexValueClass(IntWritable.class);
     inlinkJob.setEdgeValueClass(NullWritable.class);
@@ -80,8 +80,7 @@ public class InlinkCount extends Vertex<
     long startTime = System.currentTimeMillis();
     if (inlinkJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Tue May 22 05:25:49 2012
@@ -133,8 +133,7 @@ public class MindistSearch {
     long startTime = System.currentTimeMillis();
     if (connectedComponentsJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Tue May 22 05:25:49 2012
@@ -122,7 +122,7 @@ public class PageRank {
       pageJob.set("hama.pagerank.alpha", args[2]);
 
     pageJob.setAggregatorClass(AverageAggregator.class);
-    
+
     pageJob.setVertexIDClass(Text.class);
     pageJob.setVertexValueClass(DoubleWritable.class);
     pageJob.setEdgeValueClass(NullWritable.class);
@@ -136,8 +136,7 @@ public class PageRank {
     long startTime = System.currentTimeMillis();
     if (pageJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Tue May 22 05:25:49 2012
@@ -139,8 +139,7 @@ public class PiEstimator {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Tue May 22 05:25:49 2012
@@ -112,8 +112,7 @@ public class RandBench {
     long startTime = System.currentTimeMillis();
     if (bsp.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Tue May 22 05:25:49 2012
@@ -123,17 +123,15 @@ public class SSSP {
     ssspJob.setOutputValueClass(IntWritable.class);
     // Iterate until all the nodes have been reached.
     ssspJob.setMaxIteration(Integer.MAX_VALUE);
-    
+
     ssspJob.setVertexIDClass(Text.class);
     ssspJob.setVertexValueClass(IntWritable.class);
     ssspJob.setEdgeValueClass(IntWritable.class);
-    
 
     long startTime = System.currentTimeMillis();
     if (ssspJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java Tue May 22 05:25:49 2012
@@ -159,8 +159,7 @@ public class SuperstepPiEstimator {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0
-          + " seconds");
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java Tue May 22 05:25:49 2012
@@ -20,7 +20,7 @@ package org.apache.hama.examples.util;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.graph.VertexArrayWritable;
@@ -58,8 +58,8 @@ public class PagerankTextToSeq extends T
     VertexWritable key = new VertexWritable(split[0]);
     VertexWritable[] v = new VertexWritable[split.length - 1];
     for (int i = 1; i < split.length; i++) {
-      v[i - 1] = new VertexWritable(new DoubleWritable(0.0),
-          new Text(split[i]), Text.class, DoubleWritable.class);
+      v[i - 1] = new VertexWritable(NullWritable.get(), new Text(split[i]),
+          Text.class, NullWritable.class);
     }
     VertexArrayWritable value = new VertexArrayWritable();
     value.set(v);

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Tue May 22 05:25:49 2012
@@ -158,7 +158,8 @@ public class MindistSearchTest extends T
 
   public void testRepairFunctionality() throws Exception {
     // make a copy to be safe with parallel test executions
-    final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(tmp);
+    final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(
+        tmp);
     // removing 7 should resulting in creating it and getting the same result as
     // usual
     map.remove(new VertexWritable<Text, IntWritable>("7"));
@@ -183,7 +184,7 @@ public class MindistSearchTest extends T
       connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
       connectedComponentsJob.setOutputKeyClass(Text.class);
       connectedComponentsJob.setOutputValueClass(Text.class);
-      
+
       connectedComponentsJob.setVertexIDClass(Text.class);
       connectedComponentsJob.setVertexValueClass(Text.class);
       connectedComponentsJob.setEdgeValueClass(NullWritable.class);

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Tue May 22 05:25:49 2012
@@ -23,12 +23,11 @@ import org.apache.hadoop.io.Writable;
 /**
  * The edge class
  */
-public class Edge<VERTEX_ID, EDGE_VALUE_TYPE extends Writable> {
+public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
 
-  private VERTEX_ID destinationVertexID;
-  // actually the destination peer address
-  private String destinationPeerName;
-  private EDGE_VALUE_TYPE cost;
+  private final VERTEX_ID destinationVertexID;
+  private final String destinationPeerName;
+  private final EDGE_VALUE_TYPE cost;
 
   public Edge(VERTEX_ID sourceVertexID, String destVertexID,
       EDGE_VALUE_TYPE cost) {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue May 22 05:25:49 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.Partitioner;
 
 public class GraphJob extends BSPJob {
 
@@ -35,6 +36,7 @@ public class GraphJob extends BSPJob {
 
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
+  public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
 
   /**
    * Creates a new Graph Job with the given configuration and an exampleClass.
@@ -102,6 +104,13 @@ public class GraphJob extends BSPJob {
   }
 
   @Override
+  public void setPartitioner(@SuppressWarnings("rawtypes")
+  Class<? extends Partitioner> theClass) {
+    super.setPartitioner(theClass);
+    conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
+  }
+
+  @Override
   public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
     ensureState(JobState.DEFINE);
     conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
@@ -114,4 +123,5 @@ public class GraphJob extends BSPJob {
   public void setMaxIteration(int maxIteration) {
     conf.setInt("hama.graph.max.iteration", maxIteration);
   }
+
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Tue May 22 05:25:49 2012
@@ -20,6 +20,8 @@ package org.apache.hama.graph;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
@@ -35,16 +37,20 @@ public final class GraphJobMessage imple
   public static final int MAP_FLAG = 0x01;
   public static final int VERTEX_FLAG = 0x02;
   public static final int REPAIR_FLAG = 0x04;
+  public static final int PARTITION_FLAG = 0x08;
 
   // staticly defined because it is process-wide information, therefore in caps
   // considered as a constant
+  public static Class<?> VERTEX_CLASS;
   public static Class<? extends Writable> VERTEX_ID_CLASS;
   public static Class<? extends Writable> VERTEX_VALUE_CLASS;
+  public static Class<? extends Writable> EDGE_VALUE_CLASS;
 
   private int flag = MAP_FLAG;
   private MapWritable map;
   private Writable vertexId;
   private Writable vertexValue;
+  private Vertex<?, ?, ?> vertex;
 
   public GraphJobMessage() {
   }
@@ -65,6 +71,11 @@ public final class GraphJobMessage imple
     this.vertexValue = vertexValue;
   }
 
+  public GraphJobMessage(Vertex<?, ?, ?> vertex) {
+    this.flag = PARTITION_FLAG;
+    this.vertex = vertex;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeByte(this.flag);
@@ -75,6 +86,27 @@ public final class GraphJobMessage imple
       vertexValue.write(out);
     } else if (isMapMessage()) {
       map.write(out);
+    } else if (isPartitioningMessage()) {
+      vertex.getVertexID().write(out);
+      if (vertex.getValue() != null) {
+        out.writeBoolean(true);
+        vertex.getValue().write(out);
+      } else {
+        out.writeBoolean(false);
+      }
+      List<?> outEdges = vertex.getOutEdges();
+      out.writeInt(outEdges.size());
+      for (Object e : outEdges) {
+        Edge<?, ?> edge = (Edge<?, ?>) e;
+        out.writeUTF(edge.getDestinationPeerName());
+        edge.getDestinationVertexID().write(out);
+        if (edge.getValue() != null) {
+          out.writeBoolean(true);
+          edge.getValue().write(out);
+        } else {
+          out.writeBoolean(false);
+        }
+      }
     } else {
       vertexId.write(out);
     }
@@ -92,11 +124,38 @@ public final class GraphJobMessage imple
     } else if (isMapMessage()) {
       map = new MapWritable();
       map.readFields(in);
+    } else if (isPartitioningMessage()) {
+      Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
+          .newVertexInstance(VERTEX_CLASS, null);
+      Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
+      vertexId.readFields(in);
+      vertex.setVertexID(vertexId);
+      if (in.readBoolean()) {
+        Writable vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS,
+            null);
+        vertexValue.readFields(in);
+        vertex.setValue(vertexValue);
+      }
+      int size = in.readInt();
+      vertex.edges = new ArrayList<Edge<Writable, Writable>>(size);
+      for (int i = 0; i < size; i++) {
+        String destination = in.readUTF();
+        Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
+            null);
+        edgeVertexID.readFields(in);
+        Writable edgeValue = null;
+        if (in.readBoolean()) {
+          edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null);
+          edgeValue.readFields(in);
+        }
+        vertex.edges.add(new Edge<Writable, Writable>(edgeVertexID,
+            destination, edgeValue));
+      }
+      this.vertex = vertex;
     } else {
       vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
       vertexId.readFields(in);
     }
-
   }
 
   public MapWritable getMap() {
@@ -111,6 +170,10 @@ public final class GraphJobMessage imple
     return vertexValue;
   }
 
+  public Vertex<?, ?, ?> getVertex() {
+    return vertex;
+  }
+
   public boolean isMapMessage() {
     return flag == MAP_FLAG;
   }
@@ -123,10 +186,15 @@ public final class GraphJobMessage imple
     return flag == REPAIR_FLAG;
   }
 
+  public boolean isPartitioningMessage() {
+    return flag == PARTITION_FLAG;
+  }
+
   @Override
   public String toString() {
     return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
-        + vertexId + ", vertexValue=" + vertexValue + "]";
+        + vertexId + ", vertexValue=" + vertexValue + ", vertex=" + vertex
+        + "]";
   }
 
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue May 22 05:25:49 2012
@@ -39,6 +39,8 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
@@ -53,7 +55,7 @@ public final class GraphJobRunner<VERTEX
     extends
     BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> {
 
-  private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+  static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
   // make sure that these values don't collide with the vertex names
   private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
@@ -96,6 +98,7 @@ public final class GraphJobRunner<VERTEX
   Class<VERTEX_ID> vertexIdClass;
   Class<VERTEX_VALUE> vertexValueClass;
   Class<EDGE_VALUE_TYPE> edgeValueClass;
+  Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
 
   @Override
   @SuppressWarnings("unchecked")
@@ -114,11 +117,21 @@ public final class GraphJobRunner<VERTEX
     edgeValueClass = (Class<EDGE_VALUE_TYPE>) conf.getClass(
         GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
         Writable.class);
+    vertexClass = (Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>) conf
+        .getClass("hama.graph.vertex.class", Vertex.class);
 
     GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
     GraphJobMessage.VERTEX_VALUE_CLASS = vertexValueClass;
+    GraphJobMessage.VERTEX_CLASS = vertexClass;
+    GraphJobMessage.EDGE_VALUE_CLASS = edgeValueClass;
 
     boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+    boolean runtimePartitioning = conf.getBoolean(
+        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, false);
+    Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
+        .newInstance(
+            conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
+            conf);
 
     if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
         Combiner.class)) {
@@ -142,7 +155,7 @@ public final class GraphJobRunner<VERTEX
       }
     }
 
-    loadVertices(peer, repairNeeded);
+    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner);
     numberVertices = vertices.size() * peer.getNumPeers();
     // TODO refactor this to a single step
     for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
@@ -178,7 +191,8 @@ public final class GraphJobRunner<VERTEX
 
       // Map <vertexID, messages>
       final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> messages = parseMessages(peer);
-      // use iterations here, since repair can skew the number of supersteps
+      // use iterations here, since repair can skew the number of
+      // supersteps
       if (isMasterTask(peer) && iteration > 1) {
         MapWritable updatedCnt = new MapWritable();
         // exit if there's no update made
@@ -194,7 +208,8 @@ public final class GraphJobRunner<VERTEX
               if (intern.finalizeAggregation() != null) {
                 lastAggregatedValue = finalizeAggregation;
               }
-              // this count is usually the times of active vertices in the graph
+              // this count is usually the times of active
+              // vertices in the graph
               updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
                   intern.getTimesAggregated());
             }
@@ -277,7 +292,8 @@ public final class GraphJobRunner<VERTEX
     GraphJobMessage msg = null;
     final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
     while ((msg = peer.getCurrentMessage()) != null) {
-      // either this is a vertex message or a directive that must be read as map
+      // either this is a vertex message or a directive that must be read
+      // as map
       if (msg.isVertexMessage()) {
         final VERTEX_ID vertexID = (VERTEX_ID) msg.getVertexId();
         final VERTEX_VALUE value = (VERTEX_VALUE) msg.getVertexValue();
@@ -318,15 +334,15 @@ public final class GraphJobRunner<VERTEX
   @SuppressWarnings("unchecked")
   private void loadVertices(
       BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
-      boolean repairNeeded) throws IOException {
-    LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
+      boolean repairNeeded, boolean runtimePartitioning,
+      Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner) throws IOException,
+      SyncException, InterruptedException {
+    LOG.debug("vertex class: " + vertexClass);
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     KeyValuePair<? extends VertexWritable<VERTEX_ID, VERTEX_VALUE>, ? extends VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
-      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
-          .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
-              conf);
-
+      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+          vertexClass, conf);
       vertex.setVertexID(next.getKey().getVertexId());
       vertex.peer = peer;
       vertex.runner = this;
@@ -343,15 +359,35 @@ public final class GraphJobRunner<VERTEX
       }
       List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> edges = new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>();
       for (VertexWritable<VERTEX_ID, VERTEX_VALUE> e : arr) {
-        String target = peer.getPeerName(Math.abs((e.hashCode() % peer
-            .getAllPeerNames().length)));
+        int partition = partitioner.getPartition(e.getVertexId(),
+            e.getVertexValue(), peer.getNumPeers());
+        String target = peer.getPeerName(partition);
         edges.add(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(e.getVertexId(), target,
             (EDGE_VALUE_TYPE) e.getVertexValue()));
       }
 
       vertex.edges = edges;
-      vertex.setup(conf);
-      vertices.put(next.getKey().getVertexId(), vertex);
+      if (runtimePartitioning) {
+        int partition = partitioner.getPartition(vertex.getVertexID(),
+            vertex.getValue(), peer.getNumPeers());
+        peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
+      } else {
+        vertex.setup(conf);
+        vertices.put(next.getKey().getVertexId(), vertex);
+      }
+    }
+
+    if (runtimePartitioning) {
+      peer.sync();
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
+            .getVertex();
+        vertex.peer = peer;
+        vertex.runner = this;
+        vertex.setup(conf);
+        vertices.put(vertex.getVertexID(), vertex);
+      }
     }
 
     /*
@@ -372,25 +408,20 @@ public final class GraphJobRunner<VERTEX
               new GraphJobMessage(e.getDestinationVertexID()));
         }
       }
-      try {
-        peer.sync();
-      } catch (Exception e) {
-        // we can't really recover from that, so fail this task
-        throw new RuntimeException(e);
-      }
+      peer.sync();
       GraphJobMessage msg = null;
       while ((msg = peer.getCurrentMessage()) != null) {
         VERTEX_ID vertexName = (VERTEX_ID) msg.getVertexId();
         if (!vertices.containsKey(vertexName)) {
-          Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
-              .newInstance(
-                  conf.getClass("hama.graph.vertex.class", Vertex.class), conf);
+          Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+              vertexClass, conf);
           vertex.peer = peer;
           vertex.setVertexID(vertexName);
           vertex.runner = this;
           if (selfReference) {
-            String target = peer.getPeerName(Math.abs((vertex.hashCode() % peer
-                .getAllPeerNames().length)));
+            int partition = partitioner.getPartition(vertex.getVertexID(),
+                vertex.getValue(), peer.getNumPeers());
+            String target = peer.getPeerName(partition);
             vertex.edges = Collections
                 .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
                     .getVertexID(), target, null));
@@ -406,7 +437,19 @@ public final class GraphJobRunner<VERTEX
   }
 
   /**
-   * Just write <ID as Writable, Value as Writable> pair as a result
+   * @return a new vertex instance
+   */
+  public static <VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable> Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertexInstance(
+      Class<?> vertexClass, Configuration conf) {
+    @SuppressWarnings("unchecked")
+    Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
+        .newInstance(vertexClass, conf);
+    return vertex;
+  }
+
+  /**
+   * Just write <ID as Writable, Value as Writable> pair as a result. Note that
+   * this will also be executed when failure happened.
    */
   @Override
   public final void cleanup(

Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Tue May 22 05:25:49 2012
@@ -60,12 +60,11 @@ public class TestSubmitGraphJob extends 
       int vertexId = Integer.parseInt(adjacencyStringArray[0]);
       String name = pages[vertexId];
       @SuppressWarnings("unchecked")
-      VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      VertexWritable<Text, NullWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
       for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
-            new DoubleWritable(0.0d), new Text(
-                pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
-            DoubleWritable.class);
+        arr[j - 1] = new VertexWritable<Text, NullWritable>(NullWritable.get(),
+            new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
+            Text.class, NullWritable.class);
       }
       VertexArrayWritable wr = new VertexArrayWritable();
       wr.set(arr);
@@ -113,8 +112,8 @@ public class TestSubmitGraphJob extends 
     long startTime = System.currentTimeMillis();
     if (bsp.waitForCompletion(true)) {
       verifyResult();
-      LOG.info("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+      LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime)
+          / 1000.0 + " seconds");
     } else {
       fail();
     }