You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/01/14 13:24:46 UTC

svn commit: r1231491 - in /incubator/hama/trunk: examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/

Author: tjungblut
Date: Sat Jan 14 12:24:45 2012
New Revision: 1231491

URL: http://svn.apache.org/viewvc?rev=1231491&view=rev
Log:
Fixing pagerank and adding toString to vertices as well as a testcase

Added:
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java   (with props)
Modified:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sat Jan 14 12:24:45 2012
@@ -43,7 +43,7 @@ import org.apache.hama.graph.VertexArray
 import org.apache.hama.graph.VertexWritable;
 import org.apache.hama.util.KeyValuePair;
 
-public class PageRank extends
+public final class PageRank extends
     BSP<VertexWritable, VertexArrayWritable, Text, DoubleWritable> {
 
   public static final Log LOG = LogFactory.getLog(PageRank.class);
@@ -55,47 +55,54 @@ public class PageRank extends
   private final HashMap<VertexWritable, Double> lastTentativePagerank = new HashMap<VertexWritable, Double>();
 
   protected static int MAX_ITERATIONS = 30;
-  protected static String masterTaskName;
+  protected static String MASTER_TASK_NAME;
   protected static double ALPHA;
-  protected static int numOfVertices;
+  protected static long numOfVertices;
   protected static double DAMPING_FACTOR = 0.85;
   protected static double EPSILON = 0.001;
 
   @Override
-  public void setup(
+  public final void setup(
       BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
       throws IOException {
 
     DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
     EPSILON = Double.parseDouble(conf.get("epsilon.error"));
     MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
-    masterTaskName = peer.getPeerName(0);
+    MASTER_TASK_NAME = peer.getPeerName(0);
 
-    // map our stuff into ram
+    // put our graph into a map
     KeyValuePair<VertexWritable, VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
-      adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue()
-          .toArray());
+      // put the origin vertex into the outlinks to make sure dangling nodes are
+      // sending their pagerank as well
+      VertexWritable[] outlinks = (VertexWritable[]) next.getValue().toArray();
+      VertexWritable[] outlinksWithOrigin = new VertexWritable[outlinks.length + 1];
+      System.arraycopy(outlinks, 0, outlinksWithOrigin, 0, outlinks.length);
+      outlinksWithOrigin[outlinks.length] = next.getKey();
+
+      adjacencyList.put(next.getKey(), outlinksWithOrigin);
       vertexLookupMap.put(next.getKey().getName(), next.getKey());
     }
 
+    // we do not have a global number of vertices present at this point
+    // so we use a naive approximation
+    long approximateNumberOfVertices = adjacencyList.size()
+        * peer.getNumPeers();
+
     // normally this should be the global number of vertices
-    numOfVertices = vertexLookupMap.size();
-    ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
+    numOfVertices = approximateNumberOfVertices;
+    ALPHA = (1 - DAMPING_FACTOR) / (double) approximateNumberOfVertices;
 
-    // reread the input to save ram
-    peer.reopenInput();
-    VertexWritable key = new VertexWritable();
-    VertexArrayWritable value = new VertexArrayWritable();
-    while (peer.readNext(key, value)) {
-      VertexWritable vertexWritable = vertexLookupMap.get(key.getName());
-      tentativePagerank
-          .put(vertexWritable, Double.valueOf(1.0 / numOfVertices));
+    // put a tentative pagerank for each vertex into the map
+    double initialPagerank = 1.0 / (double) approximateNumberOfVertices;
+    for (VertexWritable vertexWritable : adjacencyList.keySet()) {
+      tentativePagerank.put(vertexWritable, initialPagerank);
     }
   }
 
   @Override
-  public void bsp(
+  public final void bsp(
       BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
@@ -126,8 +133,8 @@ public class PageRank extends
           }
         }
         // pregel formula:
-        // ALPHA = 0.15 / NumVertices()
-        // P(i) = ALPHA + 0.85 * sum
+        // ALPHA = (1-DAMPING_FACTOR) / NumVertices()
+        // P(i) = ALPHA + DAMPING_FACTOR * sum
         for (Entry<VertexWritable, Double> entry : sumMap.entrySet()) {
           tentativePagerank.put(entry.getKey(), ALPHA
               + (entry.getValue() * DAMPING_FACTOR));
@@ -151,7 +158,7 @@ public class PageRank extends
   }
 
   @Override
-  public void cleanup(
+  public final void cleanup(
       BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer) {
     try {
       for (Entry<VertexWritable, Double> row : tentativePagerank.entrySet()) {
@@ -163,12 +170,12 @@ public class PageRank extends
     }
   }
 
-  private double broadcastError(
+  private final double broadcastError(
       BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
       double error) throws IOException, SyncException, InterruptedException {
-    peer.send(masterTaskName, new DoubleMessage("", error));
+    peer.send(MASTER_TASK_NAME, new DoubleMessage("", error));
     peer.sync();
-    if (peer.getPeerName().equals(masterTaskName)) {
+    if (peer.getPeerName().equals(MASTER_TASK_NAME)) {
       double errorSum = 0.0;
       int count = 0;
       DoubleMessage message;
@@ -188,7 +195,7 @@ public class PageRank extends
     return message.getData();
   }
 
-  private double determineError() {
+  private final double determineError() {
     double error = 0.0;
     for (Entry<VertexWritable, Double> entry : tentativePagerank.entrySet()) {
       error += Math.abs(lastTentativePagerank.get(entry.getKey())
@@ -197,13 +204,13 @@ public class PageRank extends
     return error;
   }
 
-  private void copyTentativePageRankToBackup() {
+  private final void copyTentativePageRankToBackup() {
     for (Entry<VertexWritable, Double> entry : tentativePagerank.entrySet()) {
       lastTentativePagerank.put(entry.getKey(), entry.getValue());
     }
   }
 
-  private void sendMessageToNeighbors(
+  private final void sendMessageToNeighbors(
       BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
       VertexWritable v) throws IOException {
     VertexWritable[] outgoingEdges = adjacencyList.get(v);
@@ -216,7 +223,8 @@ public class PageRank extends
     }
   }
 
-  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+  static final void printOutput(FileSystem fs, Configuration conf)
+      throws IOException {
     LOG.info("-------------------- RESULTS --------------------");
     FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
     for (FileStatus status : stati) {
@@ -237,14 +245,14 @@ public class PageRank extends
     }
   }
 
-  public static void printUsage() {
+  public final static void printUsage() {
     System.out.println("PageRank Example:");
     System.out
         .println("<input path> <output path> [damping factor] [epsilon error] [tasks]");
 
   }
 
-  public static void main(String[] args) throws IOException,
+  public final static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException, InstantiationException,
       IllegalAccessException {
     if (args.length == 0) {
@@ -258,7 +266,7 @@ public class PageRank extends
 
     job.setInputPath(new Path(args[0]));
     job.setOutputPath(new Path(args[1]));
-    
+
     conf.set("damping.factor", (args.length > 2) ? args[2] : "0.85");
     conf.set("epsilon.error", (args.length > 3) ? args[3] : "0.000001");
     if (args.length == 5) {

Added: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1231491&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (added)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sat Jan 14 12:24:45 2012
@@ -0,0 +1,121 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+
+public class PageRankTest extends TestCase {
+
+  private static String INPUT = "/tmp/pagerank-tmp.seq";
+  private static String OUTPUT = "/tmp/pagerank-out";
+  private Configuration conf;
+  private FileSystem fs;
+
+  public void testPageRank() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    conf = new HamaConfiguration();
+    fs = FileSystem.get(conf);
+
+    generateTestData();
+    try {
+      PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.000001" });
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    Map<String, Double> rs = new HashMap<String, Double>();
+    // our desired results
+    rs.put("stackoverflow.com", 0.20495476070571675);
+    rs.put("google.com", 0.339831187357033);
+    rs.put("facebook.com", 0.042503114866791786);
+    rs.put("yahoo.com", 0.2134265215074906);
+    rs.put("twitter.com", 0.042503114866791786);
+    rs.put("nasa.gov", 0.12688096846583075);
+    rs.put("youtube.com", 0.029900332230345304);
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
+        + "/part-00000"), conf);
+    Text key = new Text();
+    DoubleWritable value = new DoubleWritable();
+    double sum = 0.0;
+    while (reader.next(key, value)) {
+      double result = (double) rs.get(key.toString());
+      assertEquals(value.get(), result);
+      sum += value.get();
+    }
+    System.out.println("Sum is: " + sum);
+    assertEquals(sum, 1.0d);
+  }
+
+  /**
+   * The graph looks like this (adjacency list, [] contains outlinks):<br/>
+   * stackoverflow.com [yahoo.com] <br/>
+   * google.com []<br/>
+   * facebook.com [twitter.com, google.com, nasa.gov]<br/>
+   * yahoo.com [nasa.gov, stackoverflow.com]<br/>
+   * twitter.com [google.com, facebook.com]<br/>
+   * nasa.gov [yahoo.com, stackoverflow.com]<br/>
+   * youtube.com [google.com, yahoo.com]<br/>
+   */
+  private void generateTestData() throws IOException {
+    Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+
+    // our first entry is null, because our indices in hama 3.0 pre calculated
+    // example starts at 1.
+    // FIXME This is really ugly.
+    String[] pages = new String[] { null, "twitter.com", "google.com",
+        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+        "youtube.com" };
+    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+        "5;4;6", "6;4", "7;2;4" };
+
+    for (int i = 0; i < lineArray.length; i++) {
+
+      String[] adjacencyStringArray = lineArray[i].split(";");
+      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+      String name = pages[vertexId];
+      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      for (int j = 1; j < adjacencyStringArray.length; j++) {
+        arr[j - 1] = new VertexWritable(
+            pages[Integer.parseInt(adjacencyStringArray[j])]);
+      }
+      VertexArrayWritable wr = new VertexArrayWritable();
+      wr.set(arr);
+      tmp.put(new VertexWritable(name), wr);
+    }
+
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
+        INPUT), VertexWritable.class, VertexArrayWritable.class);
+    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(INPUT)))
+        fs.delete(new Path(INPUT), true);
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java Sat Jan 14 12:24:45 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.io.ArrayWritable;
 
 public class VertexArrayWritable extends ArrayWritable {
@@ -25,4 +27,9 @@ public class VertexArrayWritable extends
     super(VertexWritable.class);
   }
 
+  @Override
+  public String toString() {
+    return Arrays.toString(get());
+  }
+
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Sat Jan 14 12:24:45 2012
@@ -48,7 +48,10 @@ public class VertexWritable implements W
 
   @Override
   public int hashCode() {
-    return name.hashCode();
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    return result;
   }
 
   @Override
@@ -60,7 +63,10 @@ public class VertexWritable implements W
     if (getClass() != obj.getClass())
       return false;
     VertexWritable other = (VertexWritable) obj;
-    if (!name.equals(other.name))
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
       return false;
     return true;
   }
@@ -68,5 +74,10 @@ public class VertexWritable implements W
   public String getName() {
     return name;
   }
+  
+  @Override
+  public String toString() {
+    return getName();
+  }
 
 }