You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/26 18:11:13 UTC
svn commit: r1342922 - in /incubator/hama/trunk:
core/src/main/java/org/apache/hama/bsp/
examples/src/main/java/org/apache/hama/examples/
examples/src/main/java/org/apache/hama/examples/util/
examples/src/test/java/org/apache/hama/examples/ examples/sr...
Author: tjungblut
Date: Sat May 26 16:11:11 2012
New Revision: 1342922
URL: http://svn.apache.org/viewvc?rev=1342922&view=rev
Log:
[HAMA-580]: Improve input of graph module
Added:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Removed:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexArrayWritable.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Sat May 26 16:11:11 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.Text;
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
+ @Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, BSPJob job)
throws IOException {
return new LineRecordReader(job.getConf(), (FileSplit) split);
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Sat May 26 16:11:11 2012
@@ -20,8 +20,6 @@
package org.apache.hama.examples;
import org.apache.hadoop.util.ProgramDriver;
-import org.apache.hama.examples.util.PagerankTextToSeq;
-import org.apache.hama.examples.util.SSSPTextToSeq;
public class ExampleDriver {
@@ -29,15 +27,11 @@ public class ExampleDriver {
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
- pgd.addClass("sssp-text2seq", SSSPTextToSeq.class,
- "Generates SSSP input from textfile");
pgd.addClass("sssp", SSSP.class, "Single Shortest Path");
pgd.addClass("mdstsearch", MindistSearch.class,
"Mindist search / Connected Components");
pgd.addClass("cmb", CombineExample.class, "Combine");
pgd.addClass("bench", RandBench.class, "Random Benchmark");
- pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class,
- "Generates Pagerank and mindist search input from textfile");
pgd.addClass("pagerank", PageRank.class, "PageRank");
pgd.driver(args);
} catch (Throwable e) {
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Sat May 26 16:11:11 2012
@@ -22,16 +22,17 @@ import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.graph.VertexInputReader;
public class InlinkCount extends Vertex<Text, IntWritable, NullWritable> {
@@ -48,6 +49,34 @@ public class InlinkCount extends Vertex<
}
}
+ public static class InlinkCountTextReader extends
+ VertexInputReader<LongWritable, Text, Text, IntWritable, NullWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+ * E.G:<br/>
+ * 1\t2\t3\t4<br/>
+ * 2\t3\t1<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, IntWritable, NullWritable> vertex) {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ vertex
+ .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+ }
+ }
+ return true;
+ }
+
+ }
+
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// Graph job configuration
@@ -64,13 +93,14 @@ public class InlinkCount extends Vertex<
}
inlinkJob.setVertexClass(InlinkCount.class);
- inlinkJob.setInputFormat(SequenceFileInputFormat.class);
- inlinkJob.setInputKeyClass(VertexWritable.class);
- inlinkJob.setInputValueClass(VertexArrayWritable.class);
+ inlinkJob.setInputFormat(TextInputFormat.class);
+ inlinkJob.setInputKeyClass(LongWritable.class);
+ inlinkJob.setInputValueClass(Text.class);
inlinkJob.setVertexIDClass(Text.class);
inlinkJob.setVertexValueClass(IntWritable.class);
inlinkJob.setEdgeValueClass(NullWritable.class);
+ inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class);
inlinkJob.setPartitioner(HashPartitioner.class);
inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Sat May 26 16:11:11 2012
@@ -22,16 +22,18 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
/**
* Finding the mindist vertex in a connected component.
@@ -52,7 +54,7 @@ public class MindistSearch {
// neighbourhood.
if (currentComponent == null) {
setValue(new Text(getVertexID()));
- for (Edge<Text, NullWritable> e : edges) {
+ for (Edge<Text, NullWritable> e : getEdges()) {
Text id = getVertexID();
if (id.compareTo(e.getDestinationVertexID()) > 0) {
setValue(e.getDestinationVertexID());
@@ -91,6 +93,34 @@ public class MindistSearch {
}
+ public static class MindistSearchCountReader extends
+ VertexInputReader<LongWritable, Text, Text, Text, NullWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+ * E.G:<br/>
+ * 1\t2\t3\t4<br/>
+ * 2\t3\t1<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, Text, NullWritable> vertex) {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ vertex
+ .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+ }
+ }
+ return true;
+ }
+
+ }
+
private static void printUsage() {
System.out
.println("Usage: <input> <output> [maximum iterations (default 30)] [tasks]");
@@ -103,35 +133,38 @@ public class MindistSearch {
printUsage();
HamaConfiguration conf = new HamaConfiguration(new Configuration());
- GraphJob connectedComponentsJob = new GraphJob(conf,
+ GraphJob job = new GraphJob(conf,
MindistSearchVertex.class);
- connectedComponentsJob.setJobName("Mindist Search");
+ job.setJobName("Mindist Search");
- connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
- connectedComponentsJob.setInputPath(new Path(args[0]));
- connectedComponentsJob.setOutputPath(new Path(args[1]));
+ job.setVertexClass(MindistSearchVertex.class);
+ job.setInputPath(new Path(args[0]));
+ job.setOutputPath(new Path(args[1]));
// set the min text combiner here
- connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
+ job.setCombinerClass(MinTextCombiner.class);
// set the defaults
- connectedComponentsJob.setMaxIteration(30);
+ job.setMaxIteration(30);
if (args.length == 4)
- connectedComponentsJob.setNumBspTask(Integer.parseInt(args[3]));
+ job.setNumBspTask(Integer.parseInt(args[3]));
if (args.length >= 3)
- connectedComponentsJob.setMaxIteration(Integer.parseInt(args[2]));
+ job.setMaxIteration(Integer.parseInt(args[2]));
- connectedComponentsJob.setVertexIDClass(Text.class);
- connectedComponentsJob.setVertexValueClass(Text.class);
- connectedComponentsJob.setEdgeValueClass(NullWritable.class);
-
- connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
- connectedComponentsJob.setPartitioner(HashPartitioner.class);
- connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
- connectedComponentsJob.setOutputKeyClass(Text.class);
- connectedComponentsJob.setOutputValueClass(Text.class);
+ job.setVertexIDClass(Text.class);
+ job.setVertexValueClass(Text.class);
+ job.setEdgeValueClass(NullWritable.class);
+
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(Text.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setVertexInputReaderClass(MindistSearchCountReader.class);
+ job.setPartitioner(HashPartitioner.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
long startTime = System.currentTimeMillis();
- if (connectedComponentsJob.waitForCompletion(true)) {
+ if (job.waitForCompletion(true)) {
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sat May 26 16:11:11 2012
@@ -23,15 +23,18 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
public class PageRank {
@@ -53,7 +56,7 @@ public class PageRank {
if (val != null) {
MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
}
- numEdges = this.getOutEdges().size();
+ numEdges = this.getEdges().size();
}
@Override
@@ -86,6 +89,34 @@ public class PageRank {
}
}
+ public static class PagerankTextReader extends
+ VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+ * E.G:<br/>
+ * 1\t2\t3\t4<br/>
+ * 2\t3\t1<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, DoubleWritable, NullWritable> vertex) {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ vertex
+ .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+ }
+ }
+ return true;
+ }
+
+ }
+
private static void printUsage() {
System.out
.println("Usage: <input> <output> [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]");
@@ -127,7 +158,10 @@ public class PageRank {
pageJob.setVertexValueClass(DoubleWritable.class);
pageJob.setEdgeValueClass(NullWritable.class);
- pageJob.setInputFormat(SequenceFileInputFormat.class);
+ pageJob.setInputKeyClass(LongWritable.class);
+ pageJob.setInputValueClass(Text.class);
+ pageJob.setInputFormat(TextInputFormat.class);
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(SequenceFileOutputFormat.class);
pageJob.setOutputKeyClass(Text.class);
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Sat May 26 16:11:11 2012
@@ -22,17 +22,17 @@ import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.graph.VertexInputReader;
public class SSSP {
@@ -63,7 +63,7 @@ public class SSSP {
if (minDist < this.getValue().get()) {
this.setValue(new IntWritable(minDist));
- for (Edge<Text, IntWritable> e : this.getOutEdges()) {
+ for (Edge<Text, IntWritable> e : this.getEdges()) {
sendMessage(e, new IntWritable(minDist + e.getValue().get()));
}
}
@@ -87,6 +87,35 @@ public class SSSP {
}
}
+ public static class SSSPTextReader extends
+ VertexInputReader<LongWritable, Text, Text, IntWritable, IntWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_ID:VERTEX_VALUE pairs)<br/>
+ * E.G:<br/>
+ * 1\t2:25\t3:32\t4:21<br/>
+ * 2\t3:222\t1:922<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, IntWritable, IntWritable> vertex) {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ String[] split2 = split[i].split(":");
+ vertex.addEdge(new Edge<Text, IntWritable>(new Text(split2[0]),
+ new IntWritable(Integer.parseInt(split2[1]))));
+ }
+ }
+ return true;
+ }
+
+ }
+
private static void printUsage() {
System.out.println("Usage: <startnode> <input> <output> [tasks]");
System.exit(-1);
@@ -113,12 +142,13 @@ public class SSSP {
ssspJob.setVertexClass(ShortestPathVertex.class);
ssspJob.setCombinerClass(MinIntCombiner.class);
- ssspJob.setInputFormat(SequenceFileInputFormat.class);
- ssspJob.setInputKeyClass(VertexWritable.class);
- ssspJob.setInputValueClass(VertexArrayWritable.class);
+ ssspJob.setInputFormat(TextInputFormat.class);
+ ssspJob.setInputKeyClass(LongWritable.class);
+ ssspJob.setInputValueClass(Text.class);
ssspJob.setPartitioner(HashPartitioner.class);
ssspJob.setOutputFormat(SequenceFileOutputFormat.class);
+ ssspJob.setVertexInputReaderClass(SSSPTextReader.class);
ssspJob.setOutputKeyClass(Text.class);
ssspJob.setOutputValueClass(IntWritable.class);
// Iterate until all the nodes have been reached.
Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sat May 26 16:11:11 2012
@@ -21,10 +21,7 @@ import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -32,52 +29,19 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
-import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.examples.MindistSearch.MinTextCombiner;
-import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
-import org.apache.hama.examples.util.PagerankTextToSeq;
-import org.apache.hama.graph.GraphJob;
-import org.apache.hama.graph.GraphJobRunner;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
public class MindistSearchTest extends TestCase {
- private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
- // mapping of our index of the vertex to the resulting component id
- private static final String[] resultList = new String[] { "0", "1", "2", "2",
- "1", "2", "2", "1", "2", "0" };
- static {
- String[] pages = new String[] { "0", "1", "2", "3", "4", "5", "6", "7",
- "8", "9" };
- String[] lineArray = new String[] { "0", "1;4;7", "2;3;8", "3;5", "4;1",
- "5;6", "6", "7", "8;3", "9;0" };
-
- for (int i = 0; i < lineArray.length; i++) {
- String[] adjacencyStringArray = lineArray[i].split(";");
- int vertexId = Integer.parseInt(adjacencyStringArray[0]);
- String name = pages[vertexId];
- @SuppressWarnings("unchecked")
- VertexWritable<Text, IntWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
- for (int j = 1; j < adjacencyStringArray.length; j++) {
- arr[j - 1] = new VertexWritable<Text, IntWritable>(new IntWritable(0),
- new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
- Text.class, IntWritable.class);
- }
- VertexArrayWritable wr = new VertexArrayWritable();
- wr.set(arr);
- tmp.put(
- new VertexWritable<Text, IntWritable>(new Text(name), Text.class), wr);
- }
- }
+ String[] resultList = new String[] { "0", "1", "2", "2", "1", "2", "2", "1",
+ "2", "0" };
+ String[] input = new String[] { "0", "1\t4\t7", "2\t3\t8", "3\t5", "4\t1", "5\t6",
+ "6", "7", "8\t3", "9\t0" };
+
private static String INPUT = "/tmp/pagerank-tmp.seq";
private static String TEXT_INPUT = "/tmp/pagerank.txt";
private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
@@ -92,7 +56,7 @@ public class MindistSearchTest extends T
}
public void testMindistSearch() throws Exception {
- generateSeqTestData(tmp);
+ generateTestData();
try {
MindistSearch.main(new String[] { INPUT, OUTPUT });
@@ -131,88 +95,24 @@ public class MindistSearchTest extends T
assertEquals(resultList.length, itemsRead);
}
- private void generateSeqTestData(
- Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp)
- throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
- INPUT), VertexWritable.class, VertexArrayWritable.class);
- for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
- .entrySet()) {
- writer.append(e.getKey(), e.getValue());
- }
- writer.close();
- }
-
- public void testPageRankUtil() throws Exception {
- generateTestTextData();
- // <input path> <output path>
- PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+ private void generateTestData() {
+ BufferedWriter bw = null;
try {
- MindistSearch.main(new String[] { TEXT_OUTPUT, OUTPUT });
-
- verifyResult();
- } finally {
- deleteTempDirs();
- }
- }
-
- public void testRepairFunctionality() throws Exception {
- // make a copy to be safe with parallel test executions
- final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(
- tmp);
- // removing 7 should resulting in creating it and getting the same result as
- // usual
- map.remove(new VertexWritable<Text, IntWritable>("7"));
- generateSeqTestData(map);
- try {
- HamaConfiguration conf = new HamaConfiguration(new Configuration());
- conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
- GraphJob connectedComponentsJob = new GraphJob(conf,
- MindistSearchVertex.class);
- connectedComponentsJob.setJobName("Mindist Search");
-
- connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
- connectedComponentsJob.setInputPath(new Path(INPUT));
- connectedComponentsJob.setOutputPath(new Path(OUTPUT));
- // set the min text combiner here
- connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
-
- // set the defaults
- connectedComponentsJob.setMaxIteration(30);
- connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
- connectedComponentsJob.setPartitioner(HashPartitioner.class);
- connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
- connectedComponentsJob.setOutputKeyClass(Text.class);
- connectedComponentsJob.setOutputValueClass(Text.class);
-
- connectedComponentsJob.setVertexIDClass(Text.class);
- connectedComponentsJob.setVertexValueClass(Text.class);
- connectedComponentsJob.setEdgeValueClass(NullWritable.class);
-
- if (connectedComponentsJob.waitForCompletion(true)) {
- verifyResult();
- } else {
- fail("Job not completed correctly!");
+ bw = new BufferedWriter(new FileWriter(INPUT));
+ for (String s : input) {
+ bw.write(s + "\n");
}
+ } catch (IOException e) {
+ e.printStackTrace();
} finally {
- deleteTempDirs();
- }
- }
-
- private static void generateTestTextData() throws IOException {
- BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
- for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
- .entrySet()) {
- writer.write(e.getKey() + "\t");
- for (int i = 0; i < e.getValue().get().length; i++) {
- @SuppressWarnings("unchecked")
- VertexWritable<Text, IntWritable> writable = (VertexWritable<Text, IntWritable>) e
- .getValue().get()[i];
- writer.write(writable.getVertexId() + "\t");
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- writer.write("\n");
}
- writer.close();
}
private void deleteTempDirs() {
Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sat May 26 16:11:11 2012
@@ -20,9 +20,6 @@ package org.apache.hama.examples;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -31,21 +28,19 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.examples.PageRank.PageRankVertex;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
-@SuppressWarnings("unchecked")
public class PageRankTest extends TestCase {
/**
* The graph looks like this (adjacency list, [] contains outlinks):<br/>
@@ -56,37 +51,16 @@ public class PageRankTest extends TestCa
* twitter.com [google.com, facebook.com]<br/>
* nasa.gov [yahoo.com, stackoverflow.com]<br/>
* youtube.com [google.com, yahoo.com]<br/>
+ * Note that google is removed in this part mainly to test the repair
+ * functionality.
*/
- private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
- static {
- Configuration conf = new HamaConfiguration();
- VertexWritable.CONFIGURATION = conf;
- // our first entry is null, because our indices in hama 3.0 pre calculated
- // example starts at 1.
- // FIXME This is really ugly.
- String[] pages = new String[] { null, "twitter.com", "google.com",
- "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
- "youtube.com" };
- String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
- "5;4;6", "6;4", "7;2;4" };
-
- for (int i = 0; i < lineArray.length; i++) {
-
- String[] adjacencyStringArray = lineArray[i].split(";");
- int vertexId = Integer.parseInt(adjacencyStringArray[0]);
- String name = pages[vertexId];
- VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
- for (int j = 1; j < adjacencyStringArray.length; j++) {
- arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
- new DoubleWritable(0.0d), new Text(
- pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
- DoubleWritable.class);
- }
- VertexArrayWritable wr = new VertexArrayWritable();
- wr.set(arr);
- tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
- }
- }
+ String[] input = new String[] { "stackoverflow.com\tyahoo.com",
+ "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
+ "yahoo.com\tnasa.gov\tstackoverflow.com]",
+ "twitter.com\tgoogle.com\tfacebook.com]",
+ "nasa.gov\tyahoo.com\tstackoverflow.com]",
+ "youtube.com\tgoogle.com\tyahoo.com]" };
+
private static String INPUT = "/tmp/pagerank-tmp.seq";
private static String TEXT_INPUT = "/tmp/pagerank.txt";
private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
@@ -117,26 +91,8 @@ public class PageRankTest extends TestCa
assertTrue(sum > 0.99d && sum <= 1.1d);
}
- private void generateSeqTestData(
- Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
- throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
- INPUT), VertexWritable.class, VertexArrayWritable.class);
- for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
- .entrySet()) {
- writer.append(e.getKey(), e.getValue());
- }
- writer.close();
- }
-
public void testRepairFunctionality() throws Exception {
- // make a copy to be safe with parallel test executions
- final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>(
- tmp);
- // removing google should resulting in creating it and getting the same
- // result as usual
- map.remove(new VertexWritable<Text, DoubleWritable>("google.com"));
- generateSeqTestData(map);
+ generateTestData();
try {
HamaConfiguration conf = new HamaConfiguration(new Configuration());
conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
@@ -146,7 +102,7 @@ public class PageRankTest extends TestCa
pageJob.setVertexClass(PageRankVertex.class);
pageJob.setInputPath(new Path(INPUT));
pageJob.setOutputPath(new Path(OUTPUT));
-
+ pageJob.setNumBspTask(2);
// set the defaults
pageJob.setMaxIteration(30);
pageJob.set("hama.pagerank.alpha", "0.85");
@@ -155,13 +111,14 @@ public class PageRankTest extends TestCa
pageJob.set("hama.graph.self.ref", "true");
pageJob.setAggregatorClass(AverageAggregator.class);
-
- pageJob.setInputFormat(SequenceFileInputFormat.class);
+ pageJob.setInputKeyClass(LongWritable.class);
+ pageJob.setInputValueClass(Text.class);
+ pageJob.setInputFormat(TextInputFormat.class);
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(SequenceFileOutputFormat.class);
pageJob.setOutputKeyClass(Text.class);
pageJob.setOutputValueClass(DoubleWritable.class);
-
+ pageJob.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
pageJob.setVertexIDClass(Text.class);
pageJob.setVertexValueClass(DoubleWritable.class);
pageJob.setEdgeValueClass(NullWritable.class);
@@ -175,20 +132,24 @@ public class PageRankTest extends TestCa
}
}
- @SuppressWarnings("unused")
- private static void generateTestTextData() throws IOException {
- BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
- for (Map.Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
- .entrySet()) {
- writer.write(e.getKey() + "\t");
- for (int i = 0; i < e.getValue().get().length; i++) {
- VertexWritable<Text, DoubleWritable> writable = (VertexWritable<Text, DoubleWritable>) e
- .getValue().get()[i];
- writer.write(writable.getVertexId() + "\t");
+ private void generateTestData() {
+ BufferedWriter bw = null;
+ try {
+ bw = new BufferedWriter(new FileWriter(INPUT));
+ for (String s : input) {
+ bw.write(s + "\n");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- writer.write("\n");
}
- writer.close();
}
private void deleteTempDirs() {
Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Sat May 26 16:11:11 2012
@@ -33,101 +33,19 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.graph.VertexArrayWritable;
-import org.apache.hama.graph.VertexWritable;
/**
* Testcase for {@link ShortestPaths}
*/
-
-@SuppressWarnings("unchecked")
public class SSSPTest extends TestCase {
-
- private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> testData = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
-
- static {
- Configuration conf = new Configuration();
- VertexWritable.CONFIGURATION = conf;
- String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
- "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
- "Muenchen" };
-
- for (String city : cities) {
- if (city.equals("Frankfurt")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
- textArr[0] = new VertexWritable<Text, IntWritable>(85, "Mannheim");
- textArr[1] = new VertexWritable<Text, IntWritable>(173, "Kassel");
- textArr[2] = new VertexWritable<Text, IntWritable>(217, "Wuerzburg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Stuttgart")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
- textArr[0] = new VertexWritable<Text, IntWritable>(183, "Nuernberg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Kassel")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
- textArr[0] = new VertexWritable<Text, IntWritable>(502, "Muenchen");
- textArr[1] = new VertexWritable<Text, IntWritable>(173, "Frankfurt");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Erfurt")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
- textArr[0] = new VertexWritable<Text, IntWritable>(186, "Wuerzburg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Wuerzburg")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
- textArr[0] = new VertexWritable<Text, IntWritable>(217, "Frankfurt");
- textArr[1] = new VertexWritable<Text, IntWritable>(186, "Erfurt");
- textArr[2] = new VertexWritable<Text, IntWritable>(103, "Nuernberg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Mannheim")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
- textArr[0] = new VertexWritable<Text, IntWritable>(80, "Karlsruhe");
- textArr[1] = new VertexWritable<Text, IntWritable>(85, "Frankfurt");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Karlsruhe")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
- textArr[0] = new VertexWritable<Text, IntWritable>(250, "Augsburg");
- textArr[1] = new VertexWritable<Text, IntWritable>(80, "Mannheim");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Augsburg")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
- textArr[0] = new VertexWritable<Text, IntWritable>(250, "Karlsruhe");
- textArr[1] = new VertexWritable<Text, IntWritable>(84, "Muenchen");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Nuernberg")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
- textArr[0] = new VertexWritable<Text, IntWritable>(183, "Stuttgart");
- textArr[1] = new VertexWritable<Text, IntWritable>(167, "Muenchen");
- textArr[2] = new VertexWritable<Text, IntWritable>(103, "Wuerzburg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- } else if (city.equals("Muenchen")) {
- VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
- textArr[0] = new VertexWritable<Text, IntWritable>(167, "Nuernberg");
- textArr[1] = new VertexWritable<Text, IntWritable>(502, "Kassel");
- textArr[2] = new VertexWritable<Text, IntWritable>(84, "Augsburg");
- VertexArrayWritable arr = new VertexArrayWritable();
- arr.set(textArr);
- testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
- }
- }
- }
+ String[] input = new String[] { "1:85\t2:217\t4:173", "0:85\t5:80",
+ "0:217\t6:186\t7:103",
+ "7:183",
+ "0:173\t9:502",
+ "1:80\t8:250",
+ "2:186",
+ "3:183\t9:167\t2:103",
+ "5:250\t9:84", "4:502\t7:167\t8:84" };
private static String INPUT = "/tmp/sssp-tmp.seq";
private static String TEXT_INPUT = "/tmp/sssp.txt";
@@ -145,10 +63,9 @@ public class SSSPTest extends TestCase {
public void testShortestPaths() throws IOException, InterruptedException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
- generateTestSequenceFileData();
+ generateTestData();
try {
- SSSP.main(new String[] { "Frankfurt", INPUT, OUTPUT });
-
+ SSSP.main(new String[] { "0", INPUT, OUTPUT, "2" });
verifyResult();
} finally {
deleteTempDirs();
@@ -157,16 +74,16 @@ public class SSSPTest extends TestCase {
private void verifyResult() throws IOException {
Map<String, Integer> rs = new HashMap<String, Integer>();
- rs.put("Erfurt", 403);
- rs.put("Mannheim", 85);
- rs.put("Stuttgart", 503);
- rs.put("Kassel", 173);
- rs.put("Nuernberg", 320);
- rs.put("Augsburg", 415);
- rs.put("Frankfurt", 0);
- rs.put("Muenchen", 487);
- rs.put("Wuerzburg", 217);
- rs.put("Karlsruhe", 165);
+ rs.put("6", 403);
+ rs.put("1", 85);
+ rs.put("3", 503);
+ rs.put("4", 173);
+ rs.put("7", 320);
+ rs.put("8", 415);
+ rs.put("0", 0);
+ rs.put("9", 487);
+ rs.put("2", 217);
+ rs.put("5", 165);
FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
for (FileStatus fts : globStatus) {
@@ -180,33 +97,25 @@ public class SSSPTest extends TestCase {
}
}
- private void generateTestSequenceFileData() throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
- INPUT), VertexWritable.class, VertexArrayWritable.class);
- for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
- .entrySet()) {
- writer.append(e.getKey(), e.getValue());
- }
- writer.close();
- }
-
- @SuppressWarnings("unused")
- private static void generateTestTextData() throws IOException {
- BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
- for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
- .entrySet()) {
- writer.write(e.getKey().getVertexId() + "\t");
- for (int i = 0; i < e.getValue().get().length; i++) {
- writer
- .write(((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
- .getVertexId()
- + ":"
- + ((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
- .getVertexValue() + "\t");
+ private void generateTestData() {
+ BufferedWriter bw = null;
+ try {
+ bw = new BufferedWriter(new FileWriter(INPUT));
+ int index = 0;
+ for (String s : input) {
+ bw.write((index++) + "\t" + s + "\n");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- writer.write("\n");
}
- writer.close();
}
private void deleteTempDirs() {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Sat May 26 16:11:11 2012
@@ -26,13 +26,22 @@ import org.apache.hadoop.io.Writable;
public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
private final VERTEX_ID destinationVertexID;
- private final String destinationPeerName;
private final EDGE_VALUE_TYPE cost;
+ String destinationPeerName;
- public Edge(VERTEX_ID sourceVertexID, String destVertexID,
+ public Edge(VERTEX_ID sourceVertexID, EDGE_VALUE_TYPE cost) {
+ this.destinationVertexID = sourceVertexID;
+ if (cost == null || cost instanceof NullWritable) {
+ this.cost = null;
+ } else {
+ this.cost = cost;
+ }
+ }
+
+ public Edge(VERTEX_ID sourceVertexID, String destinationPeer,
EDGE_VALUE_TYPE cost) {
this.destinationVertexID = sourceVertexID;
- this.destinationPeerName = destVertexID;
+ destinationPeerName = destinationPeer;
if (cost instanceof NullWritable) {
this.cost = null;
} else {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sat May 26 16:11:11 2012
@@ -25,8 +25,11 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
+import com.google.common.base.Preconditions;
+
public class GraphJob extends BSPJob {
public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
@@ -37,6 +40,7 @@ public class GraphJob extends BSPJob {
public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
+ public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class";
/**
* Creates a new Graph Job with the given configuration and an exampleClass.
@@ -48,12 +52,12 @@ public class GraphJob extends BSPJob {
public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
throws IOException {
super(conf);
- VertexWritable.CONFIGURATION = conf;
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
this.setVertexIDClass(Text.class);
this.setVertexValueClass(IntWritable.class);
this.setEdgeValueClass(IntWritable.class);
+ this.setPartitioner(HashPartitioner.class);
}
/**
@@ -109,6 +113,15 @@ public class GraphJob extends BSPJob {
conf.set(AGGREGATOR_CLASS_ATTR, classNames);
}
+ /**
+ * Sets the input reader for parsing the input to vertices.
+ */
+ public void setVertexInputReaderClass(
+ Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class);
+ }
+
@SuppressWarnings("unchecked")
public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
@@ -136,4 +149,25 @@ public class GraphJob extends BSPJob {
conf.setInt("hama.graph.max.iteration", maxIteration);
}
+ @Override
+ public void submit() throws IOException, InterruptedException {
+ Preconditions.checkArgument(this.getConf().get(VERTEX_CLASS_ATTR) != null,
+ "Please provide a vertex class!");
+ Preconditions.checkArgument(
+ this.getConf().get(VERTEX_ID_CLASS_ATTR) != null,
+ "Please provide an vertex ID class!");
+ Preconditions
+ .checkArgument(
+ this.getConf().get(VERTEX_VALUE_CLASS_ATTR) != null,
+ "Please provide an vertex value class, if you don't need one, use NullWritable!");
+ Preconditions
+ .checkArgument(
+ this.getConf().get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
+ "Please provide an edge value class, if you don't need one, use NullWritable!");
+ Preconditions.checkArgument(
+ this.getConf().get(VERTEX_GRAPH_INPUT_READER) != null,
+ "Please provide a vertex input reader!");
+ super.submit();
+ }
+
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Sat May 26 16:11:11 2012
@@ -94,7 +94,7 @@ public final class GraphJobMessage imple
} else {
out.writeBoolean(false);
}
- List<?> outEdges = vertex.getOutEdges();
+ List<?> outEdges = vertex.getEdges();
out.writeInt(outEdges.size());
for (Object e : outEdges) {
Edge<?, ?> edge = (Edge<?, ?>) e;
@@ -137,7 +137,7 @@ public final class GraphJobMessage imple
vertex.setValue(vertexValue);
}
int size = in.readInt();
- vertex.edges = new ArrayList<Edge<Writable, Writable>>(size);
+ vertex.setEdges(new ArrayList<Edge<Writable, Writable>>(size));
for (int i = 0; i < size; i++) {
String destination = in.readUTF();
Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
@@ -148,8 +148,8 @@ public final class GraphJobMessage imple
edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null);
edgeValue.readFields(in);
}
- vertex.edges.add(new Edge<Writable, Writable>(edgeVertexID,
- destination, edgeValue));
+ vertex.getEdges().add(
+ new Edge<Writable, Writable>(edgeVertexID, destination, edgeValue));
}
this.vertex = vertex;
} else {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sat May 26 16:11:11 2012
@@ -52,8 +52,7 @@ import org.apache.hama.util.KeyValuePair
* @param <VERTEX_VALUE> the value type of an edge.
*/
public final class GraphJobRunner<VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable>
- extends
- BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> {
+ extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
@@ -102,10 +101,9 @@ public final class GraphJobRunner<VERTEX
@Override
@SuppressWarnings("unchecked")
public final void setup(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
this.conf = peer.getConfiguration();
- VertexWritable.CONFIGURATION = conf;
// Choose one as a master to collect global updates
this.masterTask = peer.getPeerName(0);
@@ -126,7 +124,7 @@ public final class GraphJobRunner<VERTEX
boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
boolean runtimePartitioning = conf.getBoolean(
- GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, false);
+ GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
.newInstance(
conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
@@ -168,7 +166,11 @@ public final class GraphJobRunner<VERTEX
}
}
- loadVertices(peer, repairNeeded, runtimePartitioning, partitioner);
+ VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> reader = (VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
+ .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
+ VertexInputReader.class), conf);
+
+ loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader);
numberVertices = vertices.size() * peer.getNumPeers();
// TODO refactor this to a single step
for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
@@ -196,7 +198,7 @@ public final class GraphJobRunner<VERTEX
@Override
public final void bsp(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
@@ -296,7 +298,7 @@ public final class GraphJobRunner<VERTEX
}
private void runAggregators(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
int messagesSize) throws IOException {
// send msgCounts to the master task
MapWritable updatedCnt = new MapWritable();
@@ -326,7 +328,7 @@ public final class GraphJobRunner<VERTEX
@SuppressWarnings("unchecked")
private Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> parseMessages(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
GraphJobMessage msg = null;
final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
@@ -375,60 +377,66 @@ public final class GraphJobRunner<VERTEX
@SuppressWarnings("unchecked")
private void loadVertices(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
- boolean repairNeeded, boolean runtimePartitioning,
- Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner) throws IOException,
- SyncException, InterruptedException {
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ boolean repairNeeded,
+ boolean runtimePartitioning,
+ Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner,
+ VertexInputReader<Writable, Writable, VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> reader)
+ throws IOException, SyncException, InterruptedException {
+
LOG.debug("vertex class: " + vertexClass);
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
- KeyValuePair<? extends VertexWritable<VERTEX_ID, VERTEX_VALUE>, ? extends VertexArrayWritable> next = null;
- while ((next = peer.readNext()) != null) {
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
- vertexClass, conf);
- vertex.setVertexID(next.getKey().getVertexId());
- vertex.peer = peer;
- vertex.runner = this;
-
- VertexWritable<VERTEX_ID, VERTEX_VALUE>[] arr = (VertexWritable[]) next
- .getValue().toArray();
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+ vertexClass, conf);
+ vertex.peer = peer;
+ vertex.runner = this;
+ while (true) {
+ KeyValuePair<Writable, Writable> next = peer.readNext();
+ if (next == null) {
+ break;
+ }
+ boolean vertexFinished = reader.parseVertex(next.getKey(),
+ next.getValue(), vertex);
+ if (!vertexFinished) {
+ continue;
+ }
+ if (vertex.getEdges() == null) {
+ vertex.setEdges(new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>(0));
+ }
if (selfReference) {
- VertexWritable<VERTEX_ID, VERTEX_VALUE>[] tmp = new VertexWritable[arr.length + 1];
- System.arraycopy(arr, 0, tmp, 0, arr.length);
- tmp[arr.length] = new VertexWritable<VERTEX_ID, VERTEX_VALUE>(
- vertex.getValue(), vertex.getVertexID(), vertexIdClass,
- vertexValueClass);
- arr = tmp;
- }
- List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> edges = new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>();
- for (VertexWritable<VERTEX_ID, VERTEX_VALUE> e : arr) {
- int partition = partitioner.getPartition(e.getVertexId(),
- e.getVertexValue(), peer.getNumPeers());
- String target = peer.getPeerName(partition);
- edges.add(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(e.getVertexId(), target,
- (EDGE_VALUE_TYPE) e.getVertexValue()));
+ vertex.addEdge(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
+ .getVertexID(), peer.getPeerName(), null));
}
-
- vertex.edges = edges;
if (runtimePartitioning) {
int partition = partitioner.getPartition(vertex.getVertexID(),
vertex.getValue(), peer.getNumPeers());
+ // set the destination name for the edge now
+ for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> edge : vertex.getEdges()) {
+ int edgePartition = partitioner.getPartition(
+ edge.getDestinationVertexID(), (VERTEX_VALUE) edge.getValue(),
+ peer.getNumPeers());
+ edge.destinationPeerName = peer.getPeerName(edgePartition);
+ }
peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
} else {
vertex.setup(conf);
- vertices.put(next.getKey().getVertexId(), vertex);
+ vertices.put(vertex.getVertexID(), vertex);
}
+ vertex = newVertexInstance(vertexClass, conf);
+ vertex.peer = peer;
+ vertex.runner = this;
}
if (runtimePartitioning) {
peer.sync();
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> messagedVertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
.getVertex();
- vertex.peer = peer;
- vertex.runner = this;
- vertex.setup(conf);
- vertices.put(vertex.getVertexID(), vertex);
+ messagedVertex.peer = peer;
+ messagedVertex.runner = this;
+ messagedVertex.setup(conf);
+ vertices.put(messagedVertex.getVertexID(), messagedVertex);
}
}
@@ -444,7 +452,7 @@ public final class GraphJobRunner<VERTEX
final Collection<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> entries = vertices
.values();
for (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> entry : entries) {
- List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getOutEdges();
+ List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getEdges();
for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> e : outEdges) {
peer.send(e.getDestinationPeerName(),
new GraphJobMessage(e.getDestinationVertexID()));
@@ -455,23 +463,24 @@ public final class GraphJobRunner<VERTEX
while ((msg = peer.getCurrentMessage()) != null) {
VERTEX_ID vertexName = (VERTEX_ID) msg.getVertexId();
if (!vertices.containsKey(vertexName)) {
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertex = newVertexInstance(
vertexClass, conf);
- vertex.peer = peer;
- vertex.setVertexID(vertexName);
- vertex.runner = this;
+ newVertex.peer = peer;
+ newVertex.setVertexID(vertexName);
+ newVertex.runner = this;
if (selfReference) {
- int partition = partitioner.getPartition(vertex.getVertexID(),
- vertex.getValue(), peer.getNumPeers());
+ int partition = partitioner.getPartition(newVertex.getVertexID(),
+ newVertex.getValue(), peer.getNumPeers());
String target = peer.getPeerName(partition);
- vertex.edges = Collections
- .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
- .getVertexID(), target, null));
+ newVertex.setEdges(Collections
+ .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(newVertex
+ .getVertexID(), target, null)));
} else {
- vertex.edges = Collections.emptyList();
+ newVertex.setEdges(new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>(
+ 0));
}
- vertex.setup(conf);
- vertices.put(vertexName, vertex);
+ newVertex.setup(conf);
+ vertices.put(vertexName, newVertex);
}
}
}
@@ -495,7 +504,7 @@ public final class GraphJobRunner<VERTEX
*/
@Override
public final void cleanup(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
.entrySet()) {
@@ -517,7 +526,7 @@ public final class GraphJobRunner<VERTEX
}
private boolean isMasterTask(
- BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) {
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
return peer.getPeerName().equals(masterTask);
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sat May 26 16:11:11 2012
@@ -18,6 +18,7 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -31,8 +32,8 @@ public abstract class Vertex<ID_TYPE ext
private ID_TYPE vertexID;
private MSG_TYPE value;
protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
- protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer;
- public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
+ protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+ private List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
public Configuration getConf() {
return peer.getConfiguration();
@@ -56,7 +57,7 @@ public abstract class Vertex<ID_TYPE ext
@Override
public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException {
- final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getOutEdges();
+ final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getEdges();
for (Edge<ID_TYPE, EDGE_VALUE_TYPE> e : outEdges) {
sendMessage(e, msg);
}
@@ -67,8 +68,19 @@ public abstract class Vertex<ID_TYPE ext
return runner.getNumberIterations();
}
+ public void setEdges(List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> list) {
+ this.edges = list;
+ }
+
+ public void addEdge(Edge<ID_TYPE, EDGE_VALUE_TYPE> edge) {
+ if (edges == null) {
+ this.edges = new ArrayList<Edge<ID_TYPE, EDGE_VALUE_TYPE>>();
+ }
+ this.edges.add(edge);
+ }
+
@Override
- public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges() {
+ public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges() {
return edges;
}
Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1342922&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Sat May 26 16:11:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A reader to read Hama's input files and parses a vertex out of it.
+ */
+public abstract class VertexInputReader<KEY_IN extends Writable, VALUE_IN extends Writable, VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable> {
+
+ /**
+ * Parses a given key and value into the given vertex. If returned true, the
+ * given vertex is considered finished and a new instance will be given in the
+ * next call.
+ */
+ public abstract boolean parseVertex(KEY_IN key, VALUE_IN value,
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE> vertex);
+
+}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Sat May 26 16:11:11 2012
@@ -50,7 +50,7 @@ public interface VertexInterface<ID_TYPE
public void compute(Iterator<MSG_TYPE> messages) throws IOException;
/** @return a list of outgoing edges of this vertex in the input graph. */
- public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges();
+ public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges();
/** Sends a message to another vertex. */
public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sat May 26 16:11:11 2012
@@ -17,60 +17,34 @@
*/
package org.apache.hama.graph;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TestBSPMasterGroomServer;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.graph.example.PageRank;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
- private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
- static {
- Configuration conf = new HamaConfiguration();
- VertexWritable.CONFIGURATION = conf;
- // our first entry is null, because our indices in hama 3.0 pre calculated
- // example starts at 1.
- // FIXME This is really ugly.
- String[] pages = new String[] { null, "twitter.com", "google.com",
- "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
- "youtube.com" };
- String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
- "5;4;6", "6;4", "7;2;4" };
-
- for (int i = 0; i < lineArray.length; i++) {
-
- String[] adjacencyStringArray = lineArray[i].split(";");
- int vertexId = Integer.parseInt(adjacencyStringArray[0]);
- String name = pages[vertexId];
- @SuppressWarnings("unchecked")
- VertexWritable<Text, NullWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
- for (int j = 1; j < adjacencyStringArray.length; j++) {
- arr[j - 1] = new VertexWritable<Text, NullWritable>(NullWritable.get(),
- new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
- Text.class, NullWritable.class);
- }
- VertexArrayWritable wr = new VertexArrayWritable();
- wr.set(arr);
- tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
- }
- }
+ String[] input = new String[] { "stackoverflow.com\tyahoo.com",
+ "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
+ "yahoo.com\tnasa.gov\tstackoverflow.com]",
+ "twitter.com\tgoogle.com\tfacebook.com]",
+ "nasa.gov\tyahoo.com\tstackoverflow.com]",
+ "youtube.com\tgoogle.com\tyahoo.com]" };
private static String INPUT = "/tmp/pagerank-real-tmp.seq";
private static String OUTPUT = "/tmp/pagerank-real-out";
@@ -79,7 +53,7 @@ public class TestSubmitGraphJob extends
@Override
public void testSubmitJob() throws Exception {
- generateSeqTestData(tmp);
+ generateTestData();
GraphJob bsp = new GraphJob(configuration, PageRank.class);
bsp.setInputPath(new Path(INPUT));
@@ -98,13 +72,17 @@ public class TestSubmitGraphJob extends
// we need to include a vertex in its adjacency list,
// otherwise the pagerank result has a constant loss
bsp.set("hama.graph.self.ref", "true");
+ bsp.set("hama.graph.repair", "true");
bsp.setAggregatorClass(AverageAggregator.class, SumAggregator.class);
bsp.setVertexIDClass(Text.class);
bsp.setVertexValueClass(DoubleWritable.class);
bsp.setEdgeValueClass(NullWritable.class);
- bsp.setInputFormat(SequenceFileInputFormat.class);
+ bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
+ bsp.setInputFormat(TextInputFormat.class);
+ bsp.setInputKeyClass(LongWritable.class);
+ bsp.setInputValueClass(Text.class);
bsp.setPartitioner(HashPartitioner.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(Text.class);
@@ -137,15 +115,23 @@ public class TestSubmitGraphJob extends
assertTrue(sum > 0.99d && sum <= 1.1d);
}
- private void generateSeqTestData(
- Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
- throws IOException {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
- INPUT), VertexWritable.class, VertexArrayWritable.class);
- for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
- .entrySet()) {
- writer.append(e.getKey(), e.getValue());
+ private void generateTestData() {
+ BufferedWriter bw = null;
+ try {
+ bw = new BufferedWriter(new FileWriter(INPUT));
+ for (String s : input) {
+ bw.write(s + "\n");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
- writer.close();
}
}
Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1342922&r1=1342921&r2=1342922&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sat May 26 16:11:11 2012
@@ -22,9 +22,12 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
public class PageRank {
public static class PageRankVertex extends
@@ -45,7 +48,7 @@ public class PageRank {
if (val != null) {
MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
}
- numEdges = this.getOutEdges().size();
+ numEdges = this.getEdges().size();
}
@Override
@@ -65,8 +68,11 @@ public class PageRank {
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
if (this.getSuperstepCount() > 1) {
- if(this.getLastAggregatedValue(1).get() < 0.99 || this.getLastAggregatedValue(1).get() > 1.0){
- throw new RuntimeException("Sum aggregator hasn't summed correctly! " + this.getLastAggregatedValue(1).get());
+ if (this.getLastAggregatedValue(1).get() < 0.99
+ || this.getLastAggregatedValue(1).get() > 1.1) {
+ throw new RuntimeException(
+ "Sum aggregator hasn't summed correctly! "
+ + this.getLastAggregatedValue(1).get());
}
}
}
@@ -82,4 +88,33 @@ public class PageRank {
/ numEdges));
}
}
+
+ public static class PagerankTextReader extends
+ VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+ * E.G:<br/>
+ * 1\t2\t3\t4<br/>
+ * 2\t3\t1<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, DoubleWritable, NullWritable> vertex) {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ vertex
+ .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+ }
+ }
+ return true;
+ }
+
+ }
+
}