You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/01/14 13:24:46 UTC
svn commit: r1231491 - in /incubator/hama/trunk:
examples/src/main/java/org/apache/hama/examples/
examples/src/test/java/org/apache/hama/examples/
graph/src/main/java/org/apache/hama/graph/
Author: tjungblut
Date: Sat Jan 14 12:24:45 2012
New Revision: 1231491
URL: http://svn.apache.org/viewvc?rev=1231491&view=rev
Log:
Fixing pagerank and adding toString to vertices as well as a testcase
Added:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (with props)
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sat Jan 14 12:24:45 2012
@@ -43,7 +43,7 @@ import org.apache.hama.graph.VertexArray
import org.apache.hama.graph.VertexWritable;
import org.apache.hama.util.KeyValuePair;
-public class PageRank extends
+public final class PageRank extends
BSP<VertexWritable, VertexArrayWritable, Text, DoubleWritable> {
public static final Log LOG = LogFactory.getLog(PageRank.class);
@@ -55,47 +55,54 @@ public class PageRank extends
private final HashMap<VertexWritable, Double> lastTentativePagerank = new HashMap<VertexWritable, Double>();
protected static int MAX_ITERATIONS = 30;
- protected static String masterTaskName;
+ protected static String MASTER_TASK_NAME;
protected static double ALPHA;
- protected static int numOfVertices;
+ protected static long numOfVertices;
protected static double DAMPING_FACTOR = 0.85;
protected static double EPSILON = 0.001;
@Override
- public void setup(
+ public final void setup(
BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
throws IOException {
DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
EPSILON = Double.parseDouble(conf.get("epsilon.error"));
MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
- masterTaskName = peer.getPeerName(0);
+ MASTER_TASK_NAME = peer.getPeerName(0);
- // map our stuff into ram
+ // put our graph into a map
KeyValuePair<VertexWritable, VertexArrayWritable> next = null;
while ((next = peer.readNext()) != null) {
- adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue()
- .toArray());
+ // put the origin vertex into the outlinks to make sure dangling nodes are
+ // sending their pagerank as well
+ VertexWritable[] outlinks = (VertexWritable[]) next.getValue().toArray();
+ VertexWritable[] outlinksWithOrigin = new VertexWritable[outlinks.length + 1];
+ System.arraycopy(outlinks, 0, outlinksWithOrigin, 0, outlinks.length);
+ outlinksWithOrigin[outlinks.length] = next.getKey();
+
+ adjacencyList.put(next.getKey(), outlinksWithOrigin);
vertexLookupMap.put(next.getKey().getName(), next.getKey());
}
+ // we do not have a global number of vertices present at this point
+ // so we use a naive approximation
+ long approximateNumberOfVertices = adjacencyList.size()
+ * peer.getNumPeers();
+
// normally this should be the global number of vertices
- numOfVertices = vertexLookupMap.size();
- ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
+ numOfVertices = approximateNumberOfVertices;
+ ALPHA = (1 - DAMPING_FACTOR) / (double) approximateNumberOfVertices;
- // reread the input to save ram
- peer.reopenInput();
- VertexWritable key = new VertexWritable();
- VertexArrayWritable value = new VertexArrayWritable();
- while (peer.readNext(key, value)) {
- VertexWritable vertexWritable = vertexLookupMap.get(key.getName());
- tentativePagerank
- .put(vertexWritable, Double.valueOf(1.0 / numOfVertices));
+ // put a tentative pagerank for each vertex into the map
+ double initialPagerank = 1.0 / (double) approximateNumberOfVertices;
+ for (VertexWritable vertexWritable : adjacencyList.keySet()) {
+ tentativePagerank.put(vertexWritable, initialPagerank);
}
}
@Override
- public void bsp(
+ public final void bsp(
BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException {
@@ -126,8 +133,8 @@ public class PageRank extends
}
}
// pregel formula:
- // ALPHA = 0.15 / NumVertices()
- // P(i) = ALPHA + 0.85 * sum
+ // ALPHA = (1-DAMPING_FACTOR) / NumVertices()
+ // P(i) = ALPHA + DAMPING_FACTOR * sum
for (Entry<VertexWritable, Double> entry : sumMap.entrySet()) {
tentativePagerank.put(entry.getKey(), ALPHA
+ (entry.getValue() * DAMPING_FACTOR));
@@ -151,7 +158,7 @@ public class PageRank extends
}
@Override
- public void cleanup(
+ public final void cleanup(
BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer) {
try {
for (Entry<VertexWritable, Double> row : tentativePagerank.entrySet()) {
@@ -163,12 +170,12 @@ public class PageRank extends
}
}
- private double broadcastError(
+ private final double broadcastError(
BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
double error) throws IOException, SyncException, InterruptedException {
- peer.send(masterTaskName, new DoubleMessage("", error));
+ peer.send(MASTER_TASK_NAME, new DoubleMessage("", error));
peer.sync();
- if (peer.getPeerName().equals(masterTaskName)) {
+ if (peer.getPeerName().equals(MASTER_TASK_NAME)) {
double errorSum = 0.0;
int count = 0;
DoubleMessage message;
@@ -188,7 +195,7 @@ public class PageRank extends
return message.getData();
}
- private double determineError() {
+ private final double determineError() {
double error = 0.0;
for (Entry<VertexWritable, Double> entry : tentativePagerank.entrySet()) {
error += Math.abs(lastTentativePagerank.get(entry.getKey())
@@ -197,13 +204,13 @@ public class PageRank extends
return error;
}
- private void copyTentativePageRankToBackup() {
+ private final void copyTentativePageRankToBackup() {
for (Entry<VertexWritable, Double> entry : tentativePagerank.entrySet()) {
lastTentativePagerank.put(entry.getKey(), entry.getValue());
}
}
- private void sendMessageToNeighbors(
+ private final void sendMessageToNeighbors(
BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
VertexWritable v) throws IOException {
VertexWritable[] outgoingEdges = adjacencyList.get(v);
@@ -216,7 +223,8 @@ public class PageRank extends
}
}
- static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+ static final void printOutput(FileSystem fs, Configuration conf)
+ throws IOException {
LOG.info("-------------------- RESULTS --------------------");
FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
for (FileStatus status : stati) {
@@ -237,14 +245,14 @@ public class PageRank extends
}
}
- public static void printUsage() {
+ public final static void printUsage() {
System.out.println("PageRank Example:");
System.out
.println("<input path> <output path> [damping factor] [epsilon error] [tasks]");
}
- public static void main(String[] args) throws IOException,
+ public final static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
if (args.length == 0) {
@@ -258,7 +266,7 @@ public class PageRank extends
job.setInputPath(new Path(args[0]));
job.setOutputPath(new Path(args[1]));
-
+
conf.set("damping.factor", (args.length > 2) ? args[2] : "0.85");
conf.set("epsilon.error", (args.length > 3) ? args[3] : "0.000001");
if (args.length == 5) {
Added: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1231491&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (added)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sat Jan 14 12:24:45 2012
@@ -0,0 +1,121 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+
+public class PageRankTest extends TestCase {
+
+ private static String INPUT = "/tmp/pagerank-tmp.seq";
+ private static String OUTPUT = "/tmp/pagerank-out";
+ private Configuration conf;
+ private FileSystem fs;
+
+ public void testPageRank() throws IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ conf = new HamaConfiguration();
+ fs = FileSystem.get(conf);
+
+ generateTestData();
+ try {
+ PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.000001" });
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+ private void verifyResult() throws IOException {
+ Map<String, Double> rs = new HashMap<String, Double>();
+ // our desired results
+ rs.put("stackoverflow.com", 0.20495476070571675);
+ rs.put("google.com", 0.339831187357033);
+ rs.put("facebook.com", 0.042503114866791786);
+ rs.put("yahoo.com", 0.2134265215074906);
+ rs.put("twitter.com", 0.042503114866791786);
+ rs.put("nasa.gov", 0.12688096846583075);
+ rs.put("youtube.com", 0.029900332230345304);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
+ + "/part-00000"), conf);
+ Text key = new Text();
+ DoubleWritable value = new DoubleWritable();
+ double sum = 0.0;
+ while (reader.next(key, value)) {
+ double result = (double) rs.get(key.toString());
+ assertEquals(value.get(), result);
+ sum += value.get();
+ }
+ System.out.println("Sum is: " + sum);
+ assertEquals(sum, 1.0d);
+ }
+
+ /**
+ * The graph looks like this (adjacency list, [] contains outlinks):<br/>
+ * stackoverflow.com [yahoo.com] <br/>
+ * google.com []<br/>
+ * facebook.com [twitter.com, google.com, nasa.gov]<br/>
+ * yahoo.com [nasa.gov, stackoverflow.com]<br/>
+ * twitter.com [google.com, facebook.com]<br/>
+ * nasa.gov [yahoo.com, stackoverflow.com]<br/>
+ * youtube.com [google.com, yahoo.com]<br/>
+ */
+ private void generateTestData() throws IOException {
+ Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+
+ // our first entry is null, because our indices in hama 3.0 pre calculated
+ // example starts at 1.
+ // FIXME This is really ugly.
+ String[] pages = new String[] { null, "twitter.com", "google.com",
+ "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+ "youtube.com" };
+ String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+ "5;4;6", "6;4", "7;2;4" };
+
+ for (int i = 0; i < lineArray.length; i++) {
+
+ String[] adjacencyStringArray = lineArray[i].split(";");
+ int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+ String name = pages[vertexId];
+ VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+ for (int j = 1; j < adjacencyStringArray.length; j++) {
+ arr[j - 1] = new VertexWritable(
+ pages[Integer.parseInt(adjacencyStringArray[j])]);
+ }
+ VertexArrayWritable wr = new VertexArrayWritable();
+ wr.set(arr);
+ tmp.put(new VertexWritable(name), wr);
+ }
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
+ INPUT), VertexWritable.class, VertexArrayWritable.class);
+ for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+ writer.append(e.getKey(), e.getValue());
+ }
+ writer.close();
+ }
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(INPUT)))
+ fs.delete(new Path(INPUT), true);
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java Sat Jan 14 12:24:45 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph;
+import java.util.Arrays;
+
import org.apache.hadoop.io.ArrayWritable;
public class VertexArrayWritable extends ArrayWritable {
@@ -25,4 +27,9 @@ public class VertexArrayWritable extends
super(VertexWritable.class);
}
+ @Override
+ public String toString() {
+ return Arrays.toString(get());
+ }
+
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1231491&r1=1231490&r2=1231491&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Sat Jan 14 12:24:45 2012
@@ -48,7 +48,10 @@ public class VertexWritable implements W
@Override
public int hashCode() {
- return name.hashCode();
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ return result;
}
@Override
@@ -60,7 +63,10 @@ public class VertexWritable implements W
if (getClass() != obj.getClass())
return false;
VertexWritable other = (VertexWritable) obj;
- if (!name.equals(other.name))
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
return false;
return true;
}
@@ -68,5 +74,10 @@ public class VertexWritable implements W
public String getName() {
return name;
}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
}