You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/08/23 02:30:58 UTC

svn commit: r1160515 - in /incubator/hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/graph/

Author: edwardyoon
Date: Tue Aug 23 00:30:58 2011
New Revision: 1160515

URL: http://svn.apache.org/viewvc?rev=1160515&view=rev
Log:
Improve and Refactor Partitioning in the Examples

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Aug 23 00:30:58 2011
@@ -10,6 +10,7 @@ Release 0.4 - Unreleased
 
   IMPROVEMENTS
   
+    HAMA-423: Improve and Refactor Partitioning in the Examples (Thomas Jungblut via edwardyoon)
     HAMA-422: Update HttpServer to use QueuedThreadPool (edwardyoon)
     HAMA-414: Move BSPPeer constructor into child process (edwardyoon)
     HAMA-419: Migrating site from forrest to maven (edwardyoon)

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java Tue Aug 23 00:30:58 2011
@@ -18,6 +18,7 @@
 package org.apache.hama.examples.graph;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
@@ -40,11 +41,11 @@ public class PageRank extends PageRankBa
 
   private Configuration conf;
 
-  private HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList;
-  private final HashMap<String, PageRankVertex> lookupMap = new HashMap<String, PageRankVertex>();
-  private final HashMap<PageRankVertex, Double> tentativePagerank = new HashMap<PageRankVertex, Double>();
+  private final HashMap<Vertex, List<Vertex>> adjacencyList = new HashMap<Vertex, List<Vertex>>();
+  private final HashMap<String, Vertex> lookupMap = new HashMap<String, Vertex>();
+  private final HashMap<Vertex, Double> tentativePagerank = new HashMap<Vertex, Double>();
   // backup of the last pagerank to determine the error
-  private final HashMap<PageRankVertex, Double> lastTentativePagerank = new HashMap<PageRankVertex, Double>();
+  private final HashMap<Vertex, Double> lastTentativePagerank = new HashMap<Vertex, Double>();
   private String[] peerNames;
 
   @Override
@@ -52,12 +53,9 @@ public class PageRank extends PageRankBa
       InterruptedException {
     String master = conf.get(MASTER_TASK);
     // setup the datasets
-    adjacencyList = PageRankBase.mapAdjacencyList(getConf(), peer);
-    // init the pageranks to 1/n where n is the number of all vertices
-    for (PageRankVertex vertex : adjacencyList.keySet()) {
-      tentativePagerank.put(vertex, Double.valueOf(1.0 / numOfVertices));
-      lookupMap.put(vertex.getUrl(), vertex);
-    }
+    PageRankBase.mapAdjacencyList(getConf(), peer, adjacencyList,
+        tentativePagerank, lookupMap);
+
     // while the error not converges against epsilon do the pagerank stuff
     double error = 1.0;
     int iteration = 0;
@@ -71,10 +69,10 @@ public class PageRank extends PageRankBa
         // copy the old pagerank to the backup
         copyTentativePageRankToBackup();
         // sum up all incoming messages for a vertex
-        HashMap<PageRankVertex, Double> sumMap = new HashMap<PageRankVertex, Double>();
+        HashMap<Vertex, Double> sumMap = new HashMap<Vertex, Double>();
         DoubleMessage msg = null;
         while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) {
-          PageRankVertex k = lookupMap.get(msg.getTag());
+          Vertex k = lookupMap.get(msg.getTag());
           if (!sumMap.containsKey(k)) {
             sumMap.put(k, msg.getData());
           } else {
@@ -84,7 +82,7 @@ public class PageRank extends PageRankBa
         // pregel formula:
         // ALPHA = 0.15 / NumVertices()
         // P(i) = ALPHA + 0.85 * sum
-        for (Entry<PageRankVertex, Double> entry : sumMap.entrySet()) {
+        for (Entry<Vertex, Double> entry : sumMap.entrySet()) {
           tentativePagerank.put(entry.getKey(), ALPHA
               + (entry.getValue() * DAMPING_FACTOR));
         }
@@ -95,7 +93,7 @@ public class PageRank extends PageRankBa
       }
       // in every step send the tentative pagerank of a vertex to its
       // adjacent vertices
-      for (PageRankVertex vertex : adjacencyList.keySet()) {
+      for (Vertex vertex : adjacencyList.keySet()) {
         sendMessageToNeighbors(peer, vertex);
       }
 
@@ -106,11 +104,10 @@ public class PageRank extends PageRankBa
     peer.clear();
     // finally save the chunk of pageranks
     PageRankBase.savePageRankMap(peer, conf, lastTentativePagerank);
-    LOG.info("Finished with iteration " + iteration + "!");
   }
 
-  private double broadcastError(BSPPeer peer, String master,
-      double error) throws IOException, KeeperException, InterruptedException {
+  private double broadcastError(BSPPeer peer, String master, double error)
+      throws IOException, KeeperException, InterruptedException {
     peer.send(master, new DoubleMessage("", error));
     peer.sync();
     if (peer.getPeerName().equals(master)) {
@@ -135,7 +132,7 @@ public class PageRank extends PageRankBa
 
   private double determineError() {
     double error = 0.0;
-    for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+    for (Entry<Vertex, Double> entry : tentativePagerank.entrySet()) {
       error += Math.abs(lastTentativePagerank.get(entry.getKey())
           - entry.getValue());
     }
@@ -143,19 +140,19 @@ public class PageRank extends PageRankBa
   }
 
   private void copyTentativePageRankToBackup() {
-    for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+    for (Entry<Vertex, Double> entry : tentativePagerank.entrySet()) {
       lastTentativePagerank.put(entry.getKey(), entry.getValue());
     }
   }
 
-  private void sendMessageToNeighbors(BSPPeer peer, PageRankVertex v)
+  private void sendMessageToNeighbors(BSPPeer peer, Vertex v)
       throws IOException {
-    List<PageRankVertex> outgoingEdges = adjacencyList.get(v);
-    for (PageRankVertex adjacent : outgoingEdges) {
+    List<Vertex> outgoingEdges = adjacencyList.get(v);
+    for (Vertex adjacent : outgoingEdges) {
       int mod = Math.abs(adjacent.getId() % peerNames.length);
       // send a message of the tentative pagerank divided by the size of
       // the outgoing edges to all adjacents
-      peer.send(peerNames[mod], new DoubleMessage(adjacent.getUrl(),
+      peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(),
           tentativePagerank.get(v) / outgoingEdges.size()));
     }
   }
@@ -183,7 +180,8 @@ public class PageRank extends PageRankBa
   }
 
   public static void main(String[] args) throws IOException,
-      InterruptedException, ClassNotFoundException {
+      InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
     if (args.length == 0) {
       printUsage();
       System.exit(-1);
@@ -220,23 +218,19 @@ public class PageRank extends PageRankBa
 
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);
-    StringBuilder sb = new StringBuilder();
-    for (String peerName : cluster.getActiveGroomNames().values()) {
-      conf.set(MASTER_TASK, peerName);
-      sb.append(peerName + ";");
-    }
 
-    // put every peer into the configuration
-    conf.set(ShortestPaths.BSP_PEERS, sb.toString());
     // leave the iterations on default
     conf.set("max.iterations", "0");
 
+    Collection<String> activeGrooms = cluster.getActiveGroomNames().values();
+    String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+
     if (conf.get("in.path") == null) {
-      conf = PageRankBase
-          .partitionExample(new Path(conf.get("out.path")), conf);
+      conf = PageRankBase.partitionExample(new Path(conf.get("out.path")),
+          conf, grooms);
     } else {
-      conf = PageRankBase
-          .partitionTextFile(new Path(conf.get("in.path")), conf);
+      conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")),
+          conf, grooms);
     }
 
     BSPJob job = new BSPJob(conf);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java Tue Aug 23 00:30:58 2011
@@ -17,11 +17,10 @@
  */
 package org.apache.hama.examples.graph;
 
-import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
+import java.io.OutputStreamWriter;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -31,15 +30,21 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hama.examples.graph.partitioning.VertexPartitioner;
 
 public abstract class PageRankBase extends BSP {
   public static final Log LOG = LogFactory.getLog(PageRankBase.class);
@@ -51,53 +56,48 @@ public abstract class PageRankBase exten
   protected static double DAMPING_FACTOR = 0.85;
   protected static double EPSILON = 0.001;
 
-  static HashMap<PageRankVertex, List<PageRankVertex>> mapAdjacencyList(
-      Configuration conf, BSPPeer peer) throws FileNotFoundException,
+  private static final VertexPartitioner partitioner = new VertexPartitioner();
+
+  static void mapAdjacencyList(Configuration conf, BSPPeer peer,
+      HashMap<Vertex, List<Vertex>> realAdjacencyList,
+      HashMap<Vertex, Double> tentativePagerank,
+      HashMap<String, Vertex> lookupMap) throws FileNotFoundException,
       IOException {
+
     FileSystem fs = FileSystem.get(conf);
-    HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList = new HashMap<PageRankVertex, List<PageRankVertex>>();
     Path p = new Path(conf.get("in.path." + peer.getPeerName().split(":")[0]));
-    LOG.info(p.toString());
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
-    Text key = new Text();
-    Text value = new Text();
+    ObjectWritable key = new ObjectWritable(Vertex.class);
+    key.setConf(conf);
+    ArrayWritable value = new ArrayWritable(Vertex.class);
     while (reader.next(key, value)) {
-      PageRankVertex k = new PageRankVertex(key.toString());
-      PageRankVertex v = new PageRankVertex(value.toString());
-      if (!adjacencyList.containsKey(k)) {
-        adjacencyList.put(k, new LinkedList<PageRankVertex>());
-        adjacencyList.get(k).add(v);
-      } else {
-        adjacencyList.get(k).add(v);
+      Vertex realKey = (Vertex) key.get();
+      LinkedList<Vertex> list = new LinkedList<Vertex>();
+      realAdjacencyList.put(realKey, list);
+      lookupMap.put(realKey.name, realKey);
+      tentativePagerank.put(realKey, Double.valueOf(1.0 / numOfVertices));
+      for (Writable s : value.get()) {
+        list.add((Vertex) s);
       }
     }
+
     reader.close();
-    return adjacencyList;
   }
 
-  static HamaConfiguration partitionExample(Path out, HamaConfiguration conf)
-      throws IOException {
+  static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf,
+      String[] groomNames) throws IOException, InstantiationException,
+      IllegalAccessException, InterruptedException {
+
+    // set the partitioning vertex class
+    conf.setClass("hama.partitioning.vertex.class", Vertex.class,
+        PartitionableWritable.class);
 
-    String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
-    int sizeOfCluster = groomNames.length;
+    return (HamaConfiguration) partitioner.partition(conf, in, groomNames);
+  }
 
-    // setup the paths where the grooms can find their input
-    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
-    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-        sizeOfCluster);
-    FileSystem fs = FileSystem.get(conf);
-    for (String entry : groomNames) {
-      partPaths.add(new Path(out.getParent().toString() + Path.SEPARATOR
-          + ShortestPaths.PARTED + Path.SEPARATOR
-          + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
-    }
-    // create a seq writer for that
-    for (Path p : partPaths) {
-      // LOG.info(p.toString());
-      fs.delete(p, true);
-      writers.add(SequenceFile
-          .createWriter(fs, conf, p, Text.class, Text.class));
-    }
+  static HamaConfiguration partitionExample(Path out, HamaConfiguration conf,
+      String[] groomNames) throws IOException, InstantiationException,
+      IllegalAccessException, InterruptedException {
 
     /**
      * 1:twitter.com <br/>
@@ -109,6 +109,11 @@ public abstract class PageRankBase exten
      * 7:youtube.com
      */
 
+    FileSystem fs = FileSystem.get(conf);
+    Path input = new Path(out, "pagerank-example");
+    FSDataOutputStream stream = fs.create(input);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream));
+
     String[] realNames = new String[] { null, "twitter.com", "google.com",
         "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
         "youtube.com" };
@@ -116,91 +121,28 @@ public abstract class PageRankBase exten
     String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
         "5;4;6", "6;4", "7;2;4" };
 
-    int numLines = 0;
     for (String line : lineArray) {
       String[] arr = line.split(";");
       String vId = arr[0];
       int indexKey = Integer.valueOf(vId);
-      LinkedList<String> list = new LinkedList<String>();
-      for (int i = 0; i < arr.length; i++) {
-        int index = Integer.valueOf(arr[i]);
-        list.add(realNames[index]);
-      }
-
-      int mod = Math.abs(realNames[indexKey].hashCode() % sizeOfCluster);
-      for (String value : list) {
-        writers.get(mod).append(new Text(realNames[indexKey]), new Text(value));
-      }
-      numLines++;
-    }
-
-    for (SequenceFile.Writer w : writers)
-      w.close();
-
-    for (Path p : partPaths) {
-      conf.set("in.path." + p.getName(), p.toString());
-    }
-    conf.set("num.vertices", "" + numLines);
-
-    return conf;
-  }
-
-  static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf)
-      throws IOException {
-
-    String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
-    int sizeOfCluster = groomNames.length;
-
-    // setup the paths where the grooms can find their input
-    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
-    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-        sizeOfCluster);
-    FileSystem fs = FileSystem.get(conf);
-    for (String entry : groomNames) {
-      partPaths.add(new Path(in.getParent().toString() + Path.SEPARATOR
-          + ShortestPaths.PARTED + Path.SEPARATOR
-          + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
-    }
-    // create a seq writer for that
-    for (Path p : partPaths) {
-      // LOG.info(p.toString());
-      fs.delete(p, true);
-      writers.add(SequenceFile
-          .createWriter(fs, conf, p, Text.class, Text.class));
-    }
-
-    BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(in)));
-
-    String line = null;
-    long numLines = 0;
-    while ((line = br.readLine()) != null) {
-      String[] arr = line.split("\t");
-      String vId = arr[0];
-      LinkedList<String> list = new LinkedList<String>();
-      for (int i = 0; i < arr.length; i++) {
-        list.add(arr[i]);
-      }
 
-      int mod = Math.abs(vId.hashCode() % sizeOfCluster);
-      for (String value : list) {
-        writers.get(mod).append(new Text(vId), new Text(value));
+      String adjacents = "";
+      for (int i = 1; i < arr.length; i++) {
+        int index = Integer.valueOf(arr[i]);
+        adjacents += realNames[index] + "\t";
       }
-      numLines++;
+      writer.write(realNames[indexKey] + "\t" + adjacents);
+      writer.newLine();
     }
 
-    for (SequenceFile.Writer w : writers)
-      w.close();
-
-    for (Path p : partPaths) {
-      conf.set("in.path." + p.getName(), p.toString());
-    }
-    conf.set("num.vertices", "" + numLines);
+    writer.close();
+    fs.close();
 
-    return conf;
+    return partitionTextFile(input, conf, groomNames);
   }
 
   static void savePageRankMap(BSPPeer peer, Configuration conf,
-      Map<PageRankVertex, Double> tentativePagerank) throws IOException {
+      Map<Vertex, Double> tentativePagerank) throws IOException {
     FileSystem fs = FileSystem.get(conf);
     Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp"
         + Path.SEPARATOR
@@ -208,13 +150,11 @@ public abstract class PageRankBase exten
     fs.delete(outPath, true);
     final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf,
         outPath, Text.class, DoubleWritable.class);
-    for (Entry<PageRankVertex, Double> row : tentativePagerank.entrySet()) {
-      out.append(new Text(row.getKey().getUrl()), new DoubleWritable(row
-          .getValue()));
+    for (Entry<Vertex, Double> row : tentativePagerank.entrySet()) {
+      out.append(new Text(row.getKey().getName()),
+          new DoubleWritable(row.getValue()));
     }
-    LOG.info("Closing...");
     out.close();
-    LOG.info("Closed!");
   }
 
   static void printOutput(FileSystem fs, Configuration conf) throws IOException {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java Tue Aug 23 00:30:58 2011
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.examples.graph;
-
-public class PageRankVertex {
-  
-  private final int id;
-  private final String url;
-
-  public PageRankVertex(String url) {
-    super();
-    this.id = url.hashCode();
-    this.url = url;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + id;
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    PageRankVertex other = (PageRankVertex) obj;
-    if (id != other.id)
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return url;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-}

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java Tue Aug 23 00:30:58 2011
@@ -17,25 +17,26 @@
  */
 package org.apache.hama.examples.graph;
 
-public final class ShortestPathVertex {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-  private final int id;
-  private final String name;
-  private final int weight;
+public final class ShortestPathVertex extends Vertex {
+
+  private int weight;
   private Integer cost;
 
+  public ShortestPathVertex() {
+  }
+
   public ShortestPathVertex(int weight, String name) {
-    super();
-    this.id = name.hashCode();
+    super(name);
     this.weight = weight;
-    this.name = name;
   }
 
   public ShortestPathVertex(int weight, String name, Integer cost) {
-    super();
-    this.id = name.hashCode();
+    super(name);
     this.weight = weight;
-    this.name = name;
     this.cost = cost;
   }
 
@@ -65,28 +66,15 @@ public final class ShortestPathVertex {
   }
 
   @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    return result;
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    weight = in.readInt();
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    ShortestPathVertex other = (ShortestPathVertex) obj;
-    if (name == null) {
-      if (other.name != null)
-        return false;
-    } else if (!name.equals(other.name))
-      return false;
-    return true;
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(weight);
   }
 
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Tue Aug 23 00:30:58 2011
@@ -18,6 +18,7 @@
 package org.apache.hama.examples.graph;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -43,17 +44,15 @@ public class ShortestPaths extends Short
   public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
 
   private Configuration conf;
-  private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
-  private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
+  private final HashMap<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+  private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
   private String[] peerNames;
 
   @Override
   public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
-    LOG.info("Mapping graph into ram...");
     // map our input into ram
     mapAdjacencyList(conf, peer, adjacencyList, vertexLookupMap);
-    LOG.info("Finished! Starting graph initialization...");
     // parse the configuration to get the peerNames
     parsePeerNames(conf);
     // get our master groom
@@ -67,7 +66,6 @@ public class ShortestPaths extends Short
       sendMessageToNeighbors(peer, v);
     }
 
-    LOG.info("Finished! Starting main loop...");
     boolean updated = true;
     while (updated) {
       int updatesMade = 0;
@@ -91,7 +89,6 @@ public class ShortestPaths extends Short
         sendMessageToNeighbors(peer, vertex);
       }
     }
-    LOG.info("Finished!");
     // finished, finally save our map to DFS.
     saveVertexMap(conf, peer, adjacencyList);
   }
@@ -119,8 +116,8 @@ public class ShortestPaths extends Short
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private boolean broadcastUpdatesMade(BSPPeer peer, String master,
-      int updates) throws IOException, KeeperException, InterruptedException {
+  private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates)
+      throws IOException, KeeperException, InterruptedException {
     peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
     peer.sync();
     if (peer.getPeerName().equals(master)) {
@@ -128,8 +125,6 @@ public class ShortestPaths extends Short
       IntegerMessage message;
       while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
         count += message.getData();
-        LOG.info("Received " + message.getData() + " updates from "
-            + message.getTag() + " in SuperStep " + peer.getSuperstepCount());
       }
 
       for (String name : peer.getAllPeerNames()) {
@@ -154,14 +149,15 @@ public class ShortestPaths extends Short
    * @param id The vertex to all adjacent vertices the new cost has to be send.
    * @throws IOException
    */
-  private void sendMessageToNeighbors(BSPPeer peer,
-      ShortestPathVertex id) throws IOException {
+  private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id)
+      throws IOException {
     List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
     for (ShortestPathVertex adjacent : outgoingEdges) {
       int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
-      peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id
-          .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
-          + adjacent.getWeight()));
+      peer.send(peerNames[mod],
+          new IntegerMessage(adjacent.getName(),
+              id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+                  + adjacent.getWeight()));
     }
   }
 
@@ -178,11 +174,12 @@ public class ShortestPaths extends Short
   public static void printUsage() {
     System.out.println("Single Source Shortest Path Example:");
     System.out
-        .println("<Startvertex name> <optional: output path> <optional: path to own adjacency list sequencefile>");
+        .println("<Startvertex name> <optional: output path> <optional: path to own adjacency list textfile!>");
   }
 
   public static void main(String[] args) throws IOException,
-      InterruptedException, ClassNotFoundException {
+      InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
 
     printUsage();
 
@@ -191,7 +188,6 @@ public class ShortestPaths extends Short
     conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
     System.out.println("Setting default start vertex to \"Frankfurt\"!");
     conf.set(OUT_PATH, "sssp/output");
-    boolean skipPartitioning = false;
     Path adjacencyListPath = null;
 
     if (args.length > 0) {
@@ -207,10 +203,6 @@ public class ShortestPaths extends Short
         adjacencyListPath = new Path(args[2]);
       }
 
-      if (args.length > 3) {
-        skipPartitioning = Boolean.valueOf(args[3]);
-      }
-
     }
 
     Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = null;
@@ -225,19 +217,16 @@ public class ShortestPaths extends Short
     // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);
-    StringBuilder sb = new StringBuilder();
-    for (String peerName : cluster.getActiveGroomNames().values()) {
-      conf.set(MASTER_TASK, peerName);
-      sb.append(peerName);
-      sb.append(";");
-    }
-    LOG.info("Master is: " + conf.get(MASTER_TASK));
-    conf.set(BSP_PEERS, sb.toString());
+
+    Collection<String> activeGrooms = cluster.getActiveGroomNames().values();
+    String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+
     LOG.info("Starting data partitioning...");
-    if (adjacencyList == null)
-      conf = partition(adjacencyListPath, conf, skipPartitioning);
-    else
-      conf = partition(adjacencyList, conf);
+    if (adjacencyList == null) {
+      conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms);
+    } else {
+      conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms);
+    }
     LOG.info("Finished!");
 
     bsp.setNumBspTask(cluster.getGroomServers());

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java Tue Aug 23 00:30:58 2011
@@ -17,27 +17,35 @@
  */
 package org.apache.hama.examples.graph;
 
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.OutputStreamWriter;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hama.HamaConfiguration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hama.examples.graph.partitioning.ShortestPathVertexPartitioner;
 
 public abstract class ShortestPathsBase extends BSP {
-  
+
+  private static final String SSSP_EXAMPLE_FILE_NAME = "sssp-example";
   public static final String BSP_PEERS = "bsp.peers";
   public static final String SHORTEST_PATHS_START_VERTEX_ID = "shortest.paths.start.vertex.id";
   public static final String PARTED = "parted";
@@ -45,7 +53,9 @@ public abstract class ShortestPathsBase 
   public static final String OUT_PATH = "out.path";
   public static final String NAME_VALUE_SEPARATOR = ":";
   public static final String MASTER_TASK = "master.groom";
-  
+
+  private static final ShortestPathVertexPartitioner partitioner = new ShortestPathVertexPartitioner();
+
   /**
    * When finished we just writing a sequencefile of the vertex name and the
    * cost.
@@ -54,7 +64,7 @@ public abstract class ShortestPathsBase 
    * @param adjacencyList
    * @throws IOException
    */
-  static void saveVertexMap(Configuration conf, BSPPeer peer,
+  protected final static void saveVertexMap(Configuration conf, BSPPeer peer,
       Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList)
       throws IOException {
     Path outPath = new Path(conf.get(OUT_PATH) + Path.SEPARATOR
@@ -76,202 +86,95 @@ public abstract class ShortestPathsBase 
    * @param conf
    * @throws IOException
    */
-  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+  protected final static void printOutput(FileSystem fs, Configuration conf)
+      throws IOException {
     System.out.println("-------------------- RESULTS --------------------");
     FileStatus[] stati = fs.listStatus(new Path(conf.get(OUT_PATH)));
     for (FileStatus status : stati) {
-      Path path = status.getPath();
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      Text key = new Text();
-      IntWritable value = new IntWritable();
-      while (reader.next(key, value)) {
-        System.out.println(key.toString() + " | " + value.get());
+      if (!status.isDir() && !status.getPath().getName().equals(SSSP_EXAMPLE_FILE_NAME)) {
+        Path path = status.getPath();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+        Text key = new Text();
+        IntWritable value = new IntWritable();
+        while (reader.next(key, value)) {
+          System.out.println(key.toString() + " | " + value.get());
+        }
+        reader.close();
       }
-      reader.close();
     }
   }
 
-  /**
-   * 
-   * The adjacencylist contains two text fields on each line. The key component
-   * is the name of a vertex, the value is a ":" separated Text field that
-   * contains the name of the adjacent vertex leftmost and the weight on the
-   * rightmost side.
-   * 
-   * <PRE>
-   *    K               V <br/> 
-   * Vertex[Text]    AdjacentVertex : Weight [Text]
-   * </PRE>
-   * 
-   * @param adjacencyList
-   * @param vertexLookupMap
-   */
-  static void mapAdjacencyList(Configuration conf, BSPPeer peer,
+  protected final static void mapAdjacencyList(Configuration conf,
+      BSPPeer peer,
       Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
       Map<String, ShortestPathVertex> vertexLookupMap)
       throws FileNotFoundException, IOException {
+
     FileSystem fs = FileSystem.get(conf);
-    Path p = new Path(conf.get(IN_PATH
-        + peer.getPeerName().split(NAME_VALUE_SEPARATOR)[0]));
+    Path p = new Path(conf.get("in.path." + peer.getPeerName().split(":")[0]));
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
-    Text key = new Text(); // name of the vertex
-    Text value = new Text(); // name of the adjacent vertex : weight
+    ObjectWritable key = new ObjectWritable(ShortestPathVertex.class);
+    key.setConf(conf);
+    ArrayWritable value = new ArrayWritable(ShortestPathVertex.class);
     while (reader.next(key, value)) {
-      // a key vertex has weight 0 to itself
-      ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString(),
-          Integer.MAX_VALUE);
-      String[] nameWeight = value.toString().split(NAME_VALUE_SEPARATOR);
-      if (!adjacencyList.containsKey(keyVertex)) {
-        LinkedList<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
-            nameWeight[0], Integer.MAX_VALUE));
-        adjacencyList.put(keyVertex, list);
-        vertexLookupMap.put(keyVertex.getName(), keyVertex);
-      } else {
-        adjacencyList.get(keyVertex).add(
-            new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
-                nameWeight[0], Integer.MAX_VALUE));
+      ShortestPathVertex realKey = (ShortestPathVertex) key.get();
+      realKey.setCost(Integer.MAX_VALUE);
+      LinkedList<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+      adjacencyList.put(realKey, list);
+      vertexLookupMap.put(realKey.name, realKey);
+
+      for (Writable s : value.get()) {
+        final ShortestPathVertex vertex = (ShortestPathVertex) s;
+        vertex.setCost(Integer.MAX_VALUE);
+        list.add(vertex);
       }
     }
+
     reader.close();
   }
 
-  /**
-   * Partitioning for in memory adjacency lists.
-   * 
-   * @param adjacencyList
-   * @param status
-   * @param conf
-   * @return
-   * @throws IOException
-   */
-  static HamaConfiguration partition(
+  protected final static Configuration partitionExample(Configuration conf,
       Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
-      HamaConfiguration conf) throws IOException {
-
-    String[] groomNames = conf.get(BSP_PEERS).split(";");
+      String[] groomNames) throws IOException, InstantiationException,
+      IllegalAccessException, InterruptedException {
 
-    int sizeOfCluster = groomNames.length;
-
-    // setup the paths where the grooms can find their input
-    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
-    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-        sizeOfCluster);
     FileSystem fs = FileSystem.get(conf);
-    Path fileToPartition = new Path(conf.get(OUT_PATH));
-    for (String entry : groomNames) {
-      partPaths.add(new Path(fileToPartition.getParent().toString()
-          + Path.SEPARATOR + PARTED + Path.SEPARATOR
-          + entry.split(NAME_VALUE_SEPARATOR)[0]));
-    }
-    // create a seq writer for that
-    for (Path p : partPaths) {
-      // System.out.println(p.toString());
-      fs.delete(p, true);
-      writers.add(SequenceFile
-          .createWriter(fs, conf, p, Text.class, Text.class));
-    }
+    Path input = new Path(conf.get(OUT_PATH), SSSP_EXAMPLE_FILE_NAME);
+    FSDataOutputStream stream = fs.create(input);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream));
 
-    for (Entry<ShortestPathVertex, List<ShortestPathVertex>> entry : adjacencyList
-        .entrySet()) {
-      // a key vertex has weight 0 to itself
-      ShortestPathVertex keyVertex = entry.getKey();
-      // just mod the id
-      int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
-      // append it to the right sequenceFile
-      for (ShortestPathVertex value : entry.getValue())
-        writers.get(mod)
-            .append(
-                new Text(keyVertex.getName()),
-                new Text(value.getName() + NAME_VALUE_SEPARATOR
-                    + value.getWeight()));
-    }
+    Set<Entry<ShortestPathVertex, List<ShortestPathVertex>>> set = adjacencyList
+        .entrySet();
 
-    for (SequenceFile.Writer w : writers)
-      w.close();
+    for (Entry<ShortestPathVertex, List<ShortestPathVertex>> entry : set) {
 
-    for (Path p : partPaths) {
-      conf.set(IN_PATH + p.getName(), p.toString());
-    }
-    return conf;
-  }
+      String line = entry.getKey().getName();
 
-  /**
-   * Partitioning for sequencefile partitioned adjacency lists.
-   * 
-   * The adjacencylist contains two text fields on each line. The key component
-   * is the name of a vertex, the value is a ":" separated Text field that
-   * contains the name of the adjacent vertex leftmost and the weight on the
-   * rightmost side.
-   * 
-   * <PRE>
-   *    K               V <br/> 
-   * Vertex[Text]    AdjacentVertex : Weight [Text]
-   * </PRE>
-   * 
-   * @param fileToPartition
-   * @param status
-   * @param conf
-   * @return
-   * @throws IOException
-   */
-  static HamaConfiguration partition(Path fileToPartition,
-      HamaConfiguration conf, boolean skipPartitioning) throws IOException {
+      for (ShortestPathVertex v : entry.getValue()) {
+        line += "\t" + v.getName() + ":" + v.getWeight();
+      }
 
-    String[] groomNames = conf.get(BSP_PEERS).split(";");
-    int sizeOfCluster = groomNames.length;
+      writer.write(line);
+      writer.newLine();
 
-    // setup the paths where the grooms can find their input
-    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
-    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-        sizeOfCluster);
-    FileSystem fs = FileSystem.get(conf);
-    for (String entry : groomNames) {
-      partPaths.add(new Path(fileToPartition.getParent().toString()
-          + Path.SEPARATOR + PARTED + Path.SEPARATOR
-          + entry.split(NAME_VALUE_SEPARATOR)[0]));
     }
 
-    if (!skipPartitioning) {
-
-      // create a seq writer for that
-      for (Path p : partPaths) {
-        // System.out.println(p.toString());
-        fs.delete(p, true);
-        writers.add(SequenceFile.createWriter(fs, conf, p, Text.class,
-            Text.class));
-      }
-
-      // parse our file
-      if (!fs.exists(fileToPartition))
-        throw new FileNotFoundException("File " + fileToPartition
-            + " wasn't found!");
-
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fileToPartition,
-          conf);
-      Text key = new Text(); // name of the vertex
-      Text value = new Text(); // name of the adjacent vertex : weight
-
-      while (reader.next(key, value)) {
-        // a key vertex has weight 0 to itself
-        ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString());
-        // just mod the id
-        int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
-        // append it to the right sequenceFile
-        writers.get(mod).append(new Text(keyVertex.getName()), new Text(value));
-      }
+    writer.close();
+    fs.close();
 
-      reader.close();
+    return partition(conf, input, groomNames);
+  }
 
-      for (SequenceFile.Writer w : writers)
-        w.close();
-    }
+  protected final static Configuration partition(Configuration conf,
+      Path fileToPartition, String[] groomNames) throws IOException,
+      InstantiationException, IllegalAccessException, InterruptedException {
+
+    // set the partitioning vertex class
+    conf.setClass("hama.partitioning.vertex.class", ShortestPathVertex.class,
+        PartitionableWritable.class);
 
-    for (Path p : partPaths) {
-      conf.set(IN_PATH + p.getName(), p.toString());
-    }
+    return partitioner.partition(conf, fileToPartition, groomNames);
 
-    return conf;
   }
 
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java Tue Aug 23 00:30:58 2011
@@ -31,7 +31,6 @@ public class ShortestPathsGraphLoader {
         "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
         "Muenchen" };
 
-    int id = 1;
     for (String city : cities) {
       if (city.equals("Frankfurt")) {
         List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
@@ -86,7 +85,6 @@ public class ShortestPathsGraphLoader {
         list.add(new ShortestPathVertex(84, "Augsburg"));
         adjacencyList.put(new ShortestPathVertex(0, city), list);
       }
-      id++;
     }
     return adjacencyList;
   }