You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/08/23 02:30:58 UTC
svn commit: r1160515 - in /incubator/hama/trunk: ./
examples/src/main/java/org/apache/hama/examples/graph/
Author: edwardyoon
Date: Tue Aug 23 00:30:58 2011
New Revision: 1160515
URL: http://svn.apache.org/viewvc?rev=1160515&view=rev
Log:
Improve and Refactor Partitioning in the Examples
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Aug 23 00:30:58 2011
@@ -10,6 +10,7 @@ Release 0.4 - Unreleased
IMPROVEMENTS
+ HAMA-423: Improve and Refactor Partitioning in the Examples (Thomas Jungblut via edwardyoon)
HAMA-422: Update HttpServer to use QueuedThreadPool (edwardyoon)
HAMA-414: Move BSPPeer constructor into child process (edwardyoon)
HAMA-419: Migrating site from forrest to maven (edwardyoon)
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java Tue Aug 23 00:30:58 2011
@@ -18,6 +18,7 @@
package org.apache.hama.examples.graph;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
@@ -40,11 +41,11 @@ public class PageRank extends PageRankBa
private Configuration conf;
- private HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList;
- private final HashMap<String, PageRankVertex> lookupMap = new HashMap<String, PageRankVertex>();
- private final HashMap<PageRankVertex, Double> tentativePagerank = new HashMap<PageRankVertex, Double>();
+ private final HashMap<Vertex, List<Vertex>> adjacencyList = new HashMap<Vertex, List<Vertex>>();
+ private final HashMap<String, Vertex> lookupMap = new HashMap<String, Vertex>();
+ private final HashMap<Vertex, Double> tentativePagerank = new HashMap<Vertex, Double>();
// backup of the last pagerank to determine the error
- private final HashMap<PageRankVertex, Double> lastTentativePagerank = new HashMap<PageRankVertex, Double>();
+ private final HashMap<Vertex, Double> lastTentativePagerank = new HashMap<Vertex, Double>();
private String[] peerNames;
@Override
@@ -52,12 +53,9 @@ public class PageRank extends PageRankBa
InterruptedException {
String master = conf.get(MASTER_TASK);
// setup the datasets
- adjacencyList = PageRankBase.mapAdjacencyList(getConf(), peer);
- // init the pageranks to 1/n where n is the number of all vertices
- for (PageRankVertex vertex : adjacencyList.keySet()) {
- tentativePagerank.put(vertex, Double.valueOf(1.0 / numOfVertices));
- lookupMap.put(vertex.getUrl(), vertex);
- }
+ PageRankBase.mapAdjacencyList(getConf(), peer, adjacencyList,
+ tentativePagerank, lookupMap);
+
// while the error not converges against epsilon do the pagerank stuff
double error = 1.0;
int iteration = 0;
@@ -71,10 +69,10 @@ public class PageRank extends PageRankBa
// copy the old pagerank to the backup
copyTentativePageRankToBackup();
// sum up all incoming messages for a vertex
- HashMap<PageRankVertex, Double> sumMap = new HashMap<PageRankVertex, Double>();
+ HashMap<Vertex, Double> sumMap = new HashMap<Vertex, Double>();
DoubleMessage msg = null;
while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) {
- PageRankVertex k = lookupMap.get(msg.getTag());
+ Vertex k = lookupMap.get(msg.getTag());
if (!sumMap.containsKey(k)) {
sumMap.put(k, msg.getData());
} else {
@@ -84,7 +82,7 @@ public class PageRank extends PageRankBa
// pregel formula:
// ALPHA = 0.15 / NumVertices()
// P(i) = ALPHA + 0.85 * sum
- for (Entry<PageRankVertex, Double> entry : sumMap.entrySet()) {
+ for (Entry<Vertex, Double> entry : sumMap.entrySet()) {
tentativePagerank.put(entry.getKey(), ALPHA
+ (entry.getValue() * DAMPING_FACTOR));
}
@@ -95,7 +93,7 @@ public class PageRank extends PageRankBa
}
// in every step send the tentative pagerank of a vertex to its
// adjacent vertices
- for (PageRankVertex vertex : adjacencyList.keySet()) {
+ for (Vertex vertex : adjacencyList.keySet()) {
sendMessageToNeighbors(peer, vertex);
}
@@ -106,11 +104,10 @@ public class PageRank extends PageRankBa
peer.clear();
// finally save the chunk of pageranks
PageRankBase.savePageRankMap(peer, conf, lastTentativePagerank);
- LOG.info("Finished with iteration " + iteration + "!");
}
- private double broadcastError(BSPPeer peer, String master,
- double error) throws IOException, KeeperException, InterruptedException {
+ private double broadcastError(BSPPeer peer, String master, double error)
+ throws IOException, KeeperException, InterruptedException {
peer.send(master, new DoubleMessage("", error));
peer.sync();
if (peer.getPeerName().equals(master)) {
@@ -135,7 +132,7 @@ public class PageRank extends PageRankBa
private double determineError() {
double error = 0.0;
- for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+ for (Entry<Vertex, Double> entry : tentativePagerank.entrySet()) {
error += Math.abs(lastTentativePagerank.get(entry.getKey())
- entry.getValue());
}
@@ -143,19 +140,19 @@ public class PageRank extends PageRankBa
}
private void copyTentativePageRankToBackup() {
- for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+ for (Entry<Vertex, Double> entry : tentativePagerank.entrySet()) {
lastTentativePagerank.put(entry.getKey(), entry.getValue());
}
}
- private void sendMessageToNeighbors(BSPPeer peer, PageRankVertex v)
+ private void sendMessageToNeighbors(BSPPeer peer, Vertex v)
throws IOException {
- List<PageRankVertex> outgoingEdges = adjacencyList.get(v);
- for (PageRankVertex adjacent : outgoingEdges) {
+ List<Vertex> outgoingEdges = adjacencyList.get(v);
+ for (Vertex adjacent : outgoingEdges) {
int mod = Math.abs(adjacent.getId() % peerNames.length);
// send a message of the tentative pagerank divided by the size of
// the outgoing edges to all adjacents
- peer.send(peerNames[mod], new DoubleMessage(adjacent.getUrl(),
+ peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(),
tentativePagerank.get(v) / outgoingEdges.size()));
}
}
@@ -183,7 +180,8 @@ public class PageRank extends PageRankBa
}
public static void main(String[] args) throws IOException,
- InterruptedException, ClassNotFoundException {
+ InterruptedException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
if (args.length == 0) {
printUsage();
System.exit(-1);
@@ -220,23 +218,19 @@ public class PageRank extends PageRankBa
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
- StringBuilder sb = new StringBuilder();
- for (String peerName : cluster.getActiveGroomNames().values()) {
- conf.set(MASTER_TASK, peerName);
- sb.append(peerName + ";");
- }
- // put every peer into the configuration
- conf.set(ShortestPaths.BSP_PEERS, sb.toString());
// leave the iterations on default
conf.set("max.iterations", "0");
+ Collection<String> activeGrooms = cluster.getActiveGroomNames().values();
+ String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+
if (conf.get("in.path") == null) {
- conf = PageRankBase
- .partitionExample(new Path(conf.get("out.path")), conf);
+ conf = PageRankBase.partitionExample(new Path(conf.get("out.path")),
+ conf, grooms);
} else {
- conf = PageRankBase
- .partitionTextFile(new Path(conf.get("in.path")), conf);
+ conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")),
+ conf, grooms);
}
BSPJob job = new BSPJob(conf);
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java Tue Aug 23 00:30:58 2011
@@ -17,11 +17,10 @@
*/
package org.apache.hama.examples.graph;
-import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
+import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -31,15 +30,21 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hama.examples.graph.partitioning.VertexPartitioner;
public abstract class PageRankBase extends BSP {
public static final Log LOG = LogFactory.getLog(PageRankBase.class);
@@ -51,53 +56,48 @@ public abstract class PageRankBase exten
protected static double DAMPING_FACTOR = 0.85;
protected static double EPSILON = 0.001;
- static HashMap<PageRankVertex, List<PageRankVertex>> mapAdjacencyList(
- Configuration conf, BSPPeer peer) throws FileNotFoundException,
+ private static final VertexPartitioner partitioner = new VertexPartitioner();
+
+ static void mapAdjacencyList(Configuration conf, BSPPeer peer,
+ HashMap<Vertex, List<Vertex>> realAdjacencyList,
+ HashMap<Vertex, Double> tentativePagerank,
+ HashMap<String, Vertex> lookupMap) throws FileNotFoundException,
IOException {
+
FileSystem fs = FileSystem.get(conf);
- HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList = new HashMap<PageRankVertex, List<PageRankVertex>>();
Path p = new Path(conf.get("in.path." + peer.getPeerName().split(":")[0]));
- LOG.info(p.toString());
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
- Text key = new Text();
- Text value = new Text();
+ ObjectWritable key = new ObjectWritable(Vertex.class);
+ key.setConf(conf);
+ ArrayWritable value = new ArrayWritable(Vertex.class);
while (reader.next(key, value)) {
- PageRankVertex k = new PageRankVertex(key.toString());
- PageRankVertex v = new PageRankVertex(value.toString());
- if (!adjacencyList.containsKey(k)) {
- adjacencyList.put(k, new LinkedList<PageRankVertex>());
- adjacencyList.get(k).add(v);
- } else {
- adjacencyList.get(k).add(v);
+ Vertex realKey = (Vertex) key.get();
+ LinkedList<Vertex> list = new LinkedList<Vertex>();
+ realAdjacencyList.put(realKey, list);
+ lookupMap.put(realKey.name, realKey);
+ tentativePagerank.put(realKey, Double.valueOf(1.0 / numOfVertices));
+ for (Writable s : value.get()) {
+ list.add((Vertex) s);
}
}
+
reader.close();
- return adjacencyList;
}
- static HamaConfiguration partitionExample(Path out, HamaConfiguration conf)
- throws IOException {
+ static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf,
+ String[] groomNames) throws IOException, InstantiationException,
+ IllegalAccessException, InterruptedException {
+
+ // set the partitioning vertex class
+ conf.setClass("hama.partitioning.vertex.class", Vertex.class,
+ PartitionableWritable.class);
- String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
- int sizeOfCluster = groomNames.length;
+ return (HamaConfiguration) partitioner.partition(conf, in, groomNames);
+ }
- // setup the paths where the grooms can find their input
- List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- sizeOfCluster);
- FileSystem fs = FileSystem.get(conf);
- for (String entry : groomNames) {
- partPaths.add(new Path(out.getParent().toString() + Path.SEPARATOR
- + ShortestPaths.PARTED + Path.SEPARATOR
- + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
- }
- // create a seq writer for that
- for (Path p : partPaths) {
- // LOG.info(p.toString());
- fs.delete(p, true);
- writers.add(SequenceFile
- .createWriter(fs, conf, p, Text.class, Text.class));
- }
+ static HamaConfiguration partitionExample(Path out, HamaConfiguration conf,
+ String[] groomNames) throws IOException, InstantiationException,
+ IllegalAccessException, InterruptedException {
/**
* 1:twitter.com <br/>
@@ -109,6 +109,11 @@ public abstract class PageRankBase exten
* 7:youtube.com
*/
+ FileSystem fs = FileSystem.get(conf);
+ Path input = new Path(out, "pagerank-example");
+ FSDataOutputStream stream = fs.create(input);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream));
+
String[] realNames = new String[] { null, "twitter.com", "google.com",
"facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
"youtube.com" };
@@ -116,91 +121,28 @@ public abstract class PageRankBase exten
String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
"5;4;6", "6;4", "7;2;4" };
- int numLines = 0;
for (String line : lineArray) {
String[] arr = line.split(";");
String vId = arr[0];
int indexKey = Integer.valueOf(vId);
- LinkedList<String> list = new LinkedList<String>();
- for (int i = 0; i < arr.length; i++) {
- int index = Integer.valueOf(arr[i]);
- list.add(realNames[index]);
- }
-
- int mod = Math.abs(realNames[indexKey].hashCode() % sizeOfCluster);
- for (String value : list) {
- writers.get(mod).append(new Text(realNames[indexKey]), new Text(value));
- }
- numLines++;
- }
-
- for (SequenceFile.Writer w : writers)
- w.close();
-
- for (Path p : partPaths) {
- conf.set("in.path." + p.getName(), p.toString());
- }
- conf.set("num.vertices", "" + numLines);
-
- return conf;
- }
-
- static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf)
- throws IOException {
-
- String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
- int sizeOfCluster = groomNames.length;
-
- // setup the paths where the grooms can find their input
- List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- sizeOfCluster);
- FileSystem fs = FileSystem.get(conf);
- for (String entry : groomNames) {
- partPaths.add(new Path(in.getParent().toString() + Path.SEPARATOR
- + ShortestPaths.PARTED + Path.SEPARATOR
- + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
- }
- // create a seq writer for that
- for (Path p : partPaths) {
- // LOG.info(p.toString());
- fs.delete(p, true);
- writers.add(SequenceFile
- .createWriter(fs, conf, p, Text.class, Text.class));
- }
-
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(in)));
-
- String line = null;
- long numLines = 0;
- while ((line = br.readLine()) != null) {
- String[] arr = line.split("\t");
- String vId = arr[0];
- LinkedList<String> list = new LinkedList<String>();
- for (int i = 0; i < arr.length; i++) {
- list.add(arr[i]);
- }
- int mod = Math.abs(vId.hashCode() % sizeOfCluster);
- for (String value : list) {
- writers.get(mod).append(new Text(vId), new Text(value));
+ String adjacents = "";
+ for (int i = 1; i < arr.length; i++) {
+ int index = Integer.valueOf(arr[i]);
+ adjacents += realNames[index] + "\t";
}
- numLines++;
+ writer.write(realNames[indexKey] + "\t" + adjacents);
+ writer.newLine();
}
- for (SequenceFile.Writer w : writers)
- w.close();
-
- for (Path p : partPaths) {
- conf.set("in.path." + p.getName(), p.toString());
- }
- conf.set("num.vertices", "" + numLines);
+ writer.close();
+ fs.close();
- return conf;
+ return partitionTextFile(input, conf, groomNames);
}
static void savePageRankMap(BSPPeer peer, Configuration conf,
- Map<PageRankVertex, Double> tentativePagerank) throws IOException {
+ Map<Vertex, Double> tentativePagerank) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp"
+ Path.SEPARATOR
@@ -208,13 +150,11 @@ public abstract class PageRankBase exten
fs.delete(outPath, true);
final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf,
outPath, Text.class, DoubleWritable.class);
- for (Entry<PageRankVertex, Double> row : tentativePagerank.entrySet()) {
- out.append(new Text(row.getKey().getUrl()), new DoubleWritable(row
- .getValue()));
+ for (Entry<Vertex, Double> row : tentativePagerank.entrySet()) {
+ out.append(new Text(row.getKey().getName()),
+ new DoubleWritable(row.getValue()));
}
- LOG.info("Closing...");
out.close();
- LOG.info("Closed!");
}
static void printOutput(FileSystem fs, Configuration conf) throws IOException {
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankVertex.java Tue Aug 23 00:30:58 2011
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.examples.graph;
-
-public class PageRankVertex {
-
- private final int id;
- private final String url;
-
- public PageRankVertex(String url) {
- super();
- this.id = url.hashCode();
- this.url = url;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + id;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- PageRankVertex other = (PageRankVertex) obj;
- if (id != other.id)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return url;
- }
-
- public int getId() {
- return id;
- }
-
- public String getUrl() {
- return url;
- }
-
-}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java Tue Aug 23 00:30:58 2011
@@ -17,25 +17,26 @@
*/
package org.apache.hama.examples.graph;
-public final class ShortestPathVertex {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
- private final int id;
- private final String name;
- private final int weight;
+public final class ShortestPathVertex extends Vertex {
+
+ private int weight;
private Integer cost;
+ public ShortestPathVertex() {
+ }
+
public ShortestPathVertex(int weight, String name) {
- super();
- this.id = name.hashCode();
+ super(name);
this.weight = weight;
- this.name = name;
}
public ShortestPathVertex(int weight, String name, Integer cost) {
- super();
- this.id = name.hashCode();
+ super(name);
this.weight = weight;
- this.name = name;
this.cost = cost;
}
@@ -65,28 +66,15 @@ public final class ShortestPathVertex {
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- return result;
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ weight = in.readInt();
}
@Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ShortestPathVertex other = (ShortestPathVertex) obj;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- return true;
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(weight);
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Tue Aug 23 00:30:58 2011
@@ -18,6 +18,7 @@
package org.apache.hama.examples.graph;
import java.io.IOException;
+import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
@@ -43,17 +44,15 @@ public class ShortestPaths extends Short
public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
private Configuration conf;
- private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
- private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
+ private final HashMap<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+ private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
private String[] peerNames;
@Override
public void bsp(BSPPeer peer) throws IOException, KeeperException,
InterruptedException {
- LOG.info("Mapping graph into ram...");
// map our input into ram
mapAdjacencyList(conf, peer, adjacencyList, vertexLookupMap);
- LOG.info("Finished! Starting graph initialization...");
// parse the configuration to get the peerNames
parsePeerNames(conf);
// get our master groom
@@ -67,7 +66,6 @@ public class ShortestPaths extends Short
sendMessageToNeighbors(peer, v);
}
- LOG.info("Finished! Starting main loop...");
boolean updated = true;
while (updated) {
int updatesMade = 0;
@@ -91,7 +89,6 @@ public class ShortestPaths extends Short
sendMessageToNeighbors(peer, vertex);
}
}
- LOG.info("Finished!");
// finished, finally save our map to DFS.
saveVertexMap(conf, peer, adjacencyList);
}
@@ -119,8 +116,8 @@ public class ShortestPaths extends Short
* @throws KeeperException
* @throws InterruptedException
*/
- private boolean broadcastUpdatesMade(BSPPeer peer, String master,
- int updates) throws IOException, KeeperException, InterruptedException {
+ private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates)
+ throws IOException, KeeperException, InterruptedException {
peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
peer.sync();
if (peer.getPeerName().equals(master)) {
@@ -128,8 +125,6 @@ public class ShortestPaths extends Short
IntegerMessage message;
while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
count += message.getData();
- LOG.info("Received " + message.getData() + " updates from "
- + message.getTag() + " in SuperStep " + peer.getSuperstepCount());
}
for (String name : peer.getAllPeerNames()) {
@@ -154,14 +149,15 @@ public class ShortestPaths extends Short
* @param id The vertex to all adjacent vertices the new cost has to be send.
* @throws IOException
*/
- private void sendMessageToNeighbors(BSPPeer peer,
- ShortestPathVertex id) throws IOException {
+ private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id)
+ throws IOException {
List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
for (ShortestPathVertex adjacent : outgoingEdges) {
int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
- peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id
- .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
- + adjacent.getWeight()));
+ peer.send(peerNames[mod],
+ new IntegerMessage(adjacent.getName(),
+ id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+ + adjacent.getWeight()));
}
}
@@ -178,11 +174,12 @@ public class ShortestPaths extends Short
public static void printUsage() {
System.out.println("Single Source Shortest Path Example:");
System.out
- .println("<Startvertex name> <optional: output path> <optional: path to own adjacency list sequencefile>");
+ .println("<Startvertex name> <optional: output path> <optional: path to own adjacency list textfile!>");
}
public static void main(String[] args) throws IOException,
- InterruptedException, ClassNotFoundException {
+ InterruptedException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
printUsage();
@@ -191,7 +188,6 @@ public class ShortestPaths extends Short
conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
System.out.println("Setting default start vertex to \"Frankfurt\"!");
conf.set(OUT_PATH, "sssp/output");
- boolean skipPartitioning = false;
Path adjacencyListPath = null;
if (args.length > 0) {
@@ -207,10 +203,6 @@ public class ShortestPaths extends Short
adjacencyListPath = new Path(args[2]);
}
- if (args.length > 3) {
- skipPartitioning = Boolean.valueOf(args[3]);
- }
-
}
Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = null;
@@ -225,19 +217,16 @@ public class ShortestPaths extends Short
// Set the task size as a number of GroomServer
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
- StringBuilder sb = new StringBuilder();
- for (String peerName : cluster.getActiveGroomNames().values()) {
- conf.set(MASTER_TASK, peerName);
- sb.append(peerName);
- sb.append(";");
- }
- LOG.info("Master is: " + conf.get(MASTER_TASK));
- conf.set(BSP_PEERS, sb.toString());
+
+ Collection<String> activeGrooms = cluster.getActiveGroomNames().values();
+ String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+
LOG.info("Starting data partitioning...");
- if (adjacencyList == null)
- conf = partition(adjacencyListPath, conf, skipPartitioning);
- else
- conf = partition(adjacencyList, conf);
+ if (adjacencyList == null) {
+ conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms);
+ } else {
+ conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms);
+ }
LOG.info("Finished!");
bsp.setNumBspTask(cluster.getGroomServers());
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java Tue Aug 23 00:30:58 2011
@@ -17,27 +17,35 @@
*/
package org.apache.hama.examples.graph;
+import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.OutputStreamWriter;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hama.HamaConfiguration;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hama.examples.graph.partitioning.ShortestPathVertexPartitioner;
public abstract class ShortestPathsBase extends BSP {
-
+
+ private static final String SSSP_EXAMPLE_FILE_NAME = "sssp-example";
public static final String BSP_PEERS = "bsp.peers";
public static final String SHORTEST_PATHS_START_VERTEX_ID = "shortest.paths.start.vertex.id";
public static final String PARTED = "parted";
@@ -45,7 +53,9 @@ public abstract class ShortestPathsBase
public static final String OUT_PATH = "out.path";
public static final String NAME_VALUE_SEPARATOR = ":";
public static final String MASTER_TASK = "master.groom";
-
+
+ private static final ShortestPathVertexPartitioner partitioner = new ShortestPathVertexPartitioner();
+
/**
* When finished we just writing a sequencefile of the vertex name and the
* cost.
@@ -54,7 +64,7 @@ public abstract class ShortestPathsBase
* @param adjacencyList
* @throws IOException
*/
- static void saveVertexMap(Configuration conf, BSPPeer peer,
+ protected final static void saveVertexMap(Configuration conf, BSPPeer peer,
Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList)
throws IOException {
Path outPath = new Path(conf.get(OUT_PATH) + Path.SEPARATOR
@@ -76,202 +86,95 @@ public abstract class ShortestPathsBase
* @param conf
* @throws IOException
*/
- static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+ protected final static void printOutput(FileSystem fs, Configuration conf)
+ throws IOException {
System.out.println("-------------------- RESULTS --------------------");
FileStatus[] stati = fs.listStatus(new Path(conf.get(OUT_PATH)));
for (FileStatus status : stati) {
- Path path = status.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- Text key = new Text();
- IntWritable value = new IntWritable();
- while (reader.next(key, value)) {
- System.out.println(key.toString() + " | " + value.get());
+ if (!status.isDir() && !status.getPath().getName().equals(SSSP_EXAMPLE_FILE_NAME)) {
+ Path path = status.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ while (reader.next(key, value)) {
+ System.out.println(key.toString() + " | " + value.get());
+ }
+ reader.close();
}
- reader.close();
}
}
- /**
- *
- * The adjacencylist contains two text fields on each line. The key component
- * is the name of a vertex, the value is a ":" separated Text field that
- * contains the name of the adjacent vertex leftmost and the weight on the
- * rightmost side.
- *
- * <PRE>
- * K V <br/>
- * Vertex[Text] AdjacentVertex : Weight [Text]
- * </PRE>
- *
- * @param adjacencyList
- * @param vertexLookupMap
- */
- static void mapAdjacencyList(Configuration conf, BSPPeer peer,
+ protected final static void mapAdjacencyList(Configuration conf,
+ BSPPeer peer,
Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
Map<String, ShortestPathVertex> vertexLookupMap)
throws FileNotFoundException, IOException {
+
FileSystem fs = FileSystem.get(conf);
- Path p = new Path(conf.get(IN_PATH
- + peer.getPeerName().split(NAME_VALUE_SEPARATOR)[0]));
+ Path p = new Path(conf.get("in.path." + peer.getPeerName().split(":")[0]));
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
- Text key = new Text(); // name of the vertex
- Text value = new Text(); // name of the adjacent vertex : weight
+ ObjectWritable key = new ObjectWritable(ShortestPathVertex.class);
+ key.setConf(conf);
+ ArrayWritable value = new ArrayWritable(ShortestPathVertex.class);
while (reader.next(key, value)) {
- // a key vertex has weight 0 to itself
- ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString(),
- Integer.MAX_VALUE);
- String[] nameWeight = value.toString().split(NAME_VALUE_SEPARATOR);
- if (!adjacencyList.containsKey(keyVertex)) {
- LinkedList<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
- nameWeight[0], Integer.MAX_VALUE));
- adjacencyList.put(keyVertex, list);
- vertexLookupMap.put(keyVertex.getName(), keyVertex);
- } else {
- adjacencyList.get(keyVertex).add(
- new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
- nameWeight[0], Integer.MAX_VALUE));
+ ShortestPathVertex realKey = (ShortestPathVertex) key.get();
+ realKey.setCost(Integer.MAX_VALUE);
+ LinkedList<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+ adjacencyList.put(realKey, list);
+ vertexLookupMap.put(realKey.name, realKey);
+
+ for (Writable s : value.get()) {
+ final ShortestPathVertex vertex = (ShortestPathVertex) s;
+ vertex.setCost(Integer.MAX_VALUE);
+ list.add(vertex);
}
}
+
reader.close();
}
- /**
- * Partitioning for in memory adjacency lists.
- *
- * @param adjacencyList
- * @param status
- * @param conf
- * @return
- * @throws IOException
- */
- static HamaConfiguration partition(
+ protected final static Configuration partitionExample(Configuration conf,
Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
- HamaConfiguration conf) throws IOException {
-
- String[] groomNames = conf.get(BSP_PEERS).split(";");
+ String[] groomNames) throws IOException, InstantiationException,
+ IllegalAccessException, InterruptedException {
- int sizeOfCluster = groomNames.length;
-
- // setup the paths where the grooms can find their input
- List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- sizeOfCluster);
FileSystem fs = FileSystem.get(conf);
- Path fileToPartition = new Path(conf.get(OUT_PATH));
- for (String entry : groomNames) {
- partPaths.add(new Path(fileToPartition.getParent().toString()
- + Path.SEPARATOR + PARTED + Path.SEPARATOR
- + entry.split(NAME_VALUE_SEPARATOR)[0]));
- }
- // create a seq writer for that
- for (Path p : partPaths) {
- // System.out.println(p.toString());
- fs.delete(p, true);
- writers.add(SequenceFile
- .createWriter(fs, conf, p, Text.class, Text.class));
- }
+ Path input = new Path(conf.get(OUT_PATH), SSSP_EXAMPLE_FILE_NAME);
+ FSDataOutputStream stream = fs.create(input);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream));
- for (Entry<ShortestPathVertex, List<ShortestPathVertex>> entry : adjacencyList
- .entrySet()) {
- // a key vertex has weight 0 to itself
- ShortestPathVertex keyVertex = entry.getKey();
- // just mod the id
- int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
- // append it to the right sequenceFile
- for (ShortestPathVertex value : entry.getValue())
- writers.get(mod)
- .append(
- new Text(keyVertex.getName()),
- new Text(value.getName() + NAME_VALUE_SEPARATOR
- + value.getWeight()));
- }
+ Set<Entry<ShortestPathVertex, List<ShortestPathVertex>>> set = adjacencyList
+ .entrySet();
- for (SequenceFile.Writer w : writers)
- w.close();
+ for (Entry<ShortestPathVertex, List<ShortestPathVertex>> entry : set) {
- for (Path p : partPaths) {
- conf.set(IN_PATH + p.getName(), p.toString());
- }
- return conf;
- }
+ String line = entry.getKey().getName();
- /**
- * Partitioning for sequencefile partitioned adjacency lists.
- *
- * The adjacencylist contains two text fields on each line. The key component
- * is the name of a vertex, the value is a ":" separated Text field that
- * contains the name of the adjacent vertex leftmost and the weight on the
- * rightmost side.
- *
- * <PRE>
- * K V <br/>
- * Vertex[Text] AdjacentVertex : Weight [Text]
- * </PRE>
- *
- * @param fileToPartition
- * @param status
- * @param conf
- * @return
- * @throws IOException
- */
- static HamaConfiguration partition(Path fileToPartition,
- HamaConfiguration conf, boolean skipPartitioning) throws IOException {
+ for (ShortestPathVertex v : entry.getValue()) {
+ line += "\t" + v.getName() + ":" + v.getWeight();
+ }
- String[] groomNames = conf.get(BSP_PEERS).split(";");
- int sizeOfCluster = groomNames.length;
+ writer.write(line);
+ writer.newLine();
- // setup the paths where the grooms can find their input
- List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- sizeOfCluster);
- FileSystem fs = FileSystem.get(conf);
- for (String entry : groomNames) {
- partPaths.add(new Path(fileToPartition.getParent().toString()
- + Path.SEPARATOR + PARTED + Path.SEPARATOR
- + entry.split(NAME_VALUE_SEPARATOR)[0]));
}
- if (!skipPartitioning) {
-
- // create a seq writer for that
- for (Path p : partPaths) {
- // System.out.println(p.toString());
- fs.delete(p, true);
- writers.add(SequenceFile.createWriter(fs, conf, p, Text.class,
- Text.class));
- }
-
- // parse our file
- if (!fs.exists(fileToPartition))
- throw new FileNotFoundException("File " + fileToPartition
- + " wasn't found!");
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, fileToPartition,
- conf);
- Text key = new Text(); // name of the vertex
- Text value = new Text(); // name of the adjacent vertex : weight
-
- while (reader.next(key, value)) {
- // a key vertex has weight 0 to itself
- ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString());
- // just mod the id
- int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
- // append it to the right sequenceFile
- writers.get(mod).append(new Text(keyVertex.getName()), new Text(value));
- }
+ writer.close();
+ fs.close();
- reader.close();
+ return partition(conf, input, groomNames);
+ }
- for (SequenceFile.Writer w : writers)
- w.close();
- }
+ protected final static Configuration partition(Configuration conf,
+ Path fileToPartition, String[] groomNames) throws IOException,
+ InstantiationException, IllegalAccessException, InterruptedException {
+
+ // set the partitioning vertex class
+ conf.setClass("hama.partitioning.vertex.class", ShortestPathVertex.class,
+ PartitionableWritable.class);
- for (Path p : partPaths) {
- conf.set(IN_PATH + p.getName(), p.toString());
- }
+ return partitioner.partition(conf, fileToPartition, groomNames);
- return conf;
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java?rev=1160515&r1=1160514&r2=1160515&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java Tue Aug 23 00:30:58 2011
@@ -31,7 +31,6 @@ public class ShortestPathsGraphLoader {
"Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
"Muenchen" };
- int id = 1;
for (String city : cities) {
if (city.equals("Frankfurt")) {
List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
@@ -86,7 +85,6 @@ public class ShortestPathsGraphLoader {
list.add(new ShortestPathVertex(84, "Augsburg"));
adjacencyList.put(new ShortestPathVertex(0, city), list);
}
- id++;
}
return adjacencyList;
}