You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/01/13 21:45:36 UTC
svn commit: r1432733 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
examples/src/main/java/org/apache/hama/examples/
examples/src/main/java/org/apache/hama/examples/util/
examples/src/test/java/org/apache/hama/examples/ graph/src/main/jav...
Author: surajsmenon
Date: Sun Jan 13 20:45:35 2013
New Revision: 1432733
URL: http://svn.apache.org/viewvc?rev=1432733&view=rev
Log:
[HAMA-700] and fixed the unit tests.
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun Jan 13 20:45:35 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
NEW FEATURES
+ HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)
HAMA-524: Add SpMV example (Mikalai Parafeniuk via edwardyoon)
HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sun Jan 13 20:45:35 2013
@@ -22,7 +22,6 @@ import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sun Jan 13 20:45:35 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
@@ -288,7 +289,7 @@ public class BSPJobClient extends Config
* @throws IOException
*/
public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
- IOException {
+ IOException {
return submitJobInternal(job, jobSubmitClient.getNewJobId());
}
@@ -368,12 +369,14 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
-
protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
-
- if(job.get("bsp.partitioning.runner.job") != null){return job;}//Early exit for the partitioner job.
-
- InputSplit[] splits = job.getInputFormat().getSplits(job,
+
+ if (job.get("bsp.partitioning.runner.job") != null) {
+ return job;
+ }// Early exit for the partitioner job.
+
+ InputSplit[] splits = job.getInputFormat().getSplits(
+ job,
(isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
: maxTasks);
@@ -386,6 +389,17 @@ public class BSPJobClient extends Config
if (inputPath != null) {
int numSplits = splits.length;
int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" numTasks = "
+ + numTasks
+ + " numSplits = "
+ + numSplits
+ + " enable = "
+ + (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false)
+ + " class = " + job.getConfiguration().get(
+ Constants.RUNTIME_PARTITIONING_CLASS)));
+ }
if ((numTasks > 0 && numTasks != numSplits)
|| (job.getConfiguration().getBoolean(
@@ -398,14 +412,18 @@ public class BSPJobClient extends Config
fs.delete(partitionDir, true);
}
- HamaConfiguration conf = new HamaConfiguration();
+ HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
+
conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
.get(Constants.RUNTIME_PARTITIONING_DIR));
}
- conf.set(Constants.RUNTIME_PARTITIONING_CLASS, job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+ if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null) {
+ conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+ job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+ }
BSPJob partitioningJob = new BSPJob(conf);
partitioningJob.setInputPath(new Path(job.getConfiguration().get(
Constants.JOB_INPUT_DIR)));
@@ -413,8 +431,12 @@ public class BSPJobClient extends Config
partitioningJob.setInputKeyClass(job.getInputKeyClass());
partitioningJob.setInputValueClass(job.getInputValueClass());
partitioningJob.setOutputFormat(NullOutputFormat.class);
+ partitioningJob.setOutputKeyClass(NullWritable.class);
+ partitioningJob.setOutputValueClass(NullWritable.class);
partitioningJob.setBspClass(PartitioningRunner.class);
partitioningJob.set("bsp.partitioning.runner.job", "true");
+ partitioningJob.getConfiguration().setBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false);
boolean isPartitioned = false;
try {
@@ -431,6 +453,7 @@ public class BSPJobClient extends Config
} else {
job.setInputPath(new Path(inputDir + "/partitions"));
}
+ job.setInputFormat(SequenceFileInputFormat.class);
} else {
LOG.error("Error partitioning the input path.");
throw new IOException("Runtime partition failed for the job.");
@@ -440,7 +463,6 @@ public class BSPJobClient extends Config
return job;
}
-
protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
int maxTasks;
ClusterStatus clusterStatus = getClusterStatus(true);
@@ -628,9 +650,9 @@ public class BSPJobClient extends Config
if (job.isSuccessful()) {
LOG.info("The total number of supersteps: " + info.getSuperstepCount());
info.getStatus()
- .getCounter()
- .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
- info.getSuperstepCount());
+ .getCounter()
+ .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
+ info.getSuperstepCount());
info.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
@@ -692,7 +714,7 @@ public class BSPJobClient extends Config
}
public static void runJob(BSPJob job) throws FileNotFoundException,
- IOException {
+ IOException {
BSPJobClient jc = new BSPJobClient(job.getConfiguration());
if (job.getNumBspTask() == 0
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun Jan 13 20:45:35 2013
@@ -36,29 +36,87 @@ import org.apache.hama.util.KeyValuePair
public class PartitioningRunner extends
BSP<Writable, Writable, Writable, Writable, NullWritable> {
+
private Configuration conf;
private int desiredNum;
private FileSystem fs = null;
private Path partitionDir;
+ private RecordConverter converter;
private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer, Map<Writable, Writable>>();
@Override
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
throws IOException, SyncException, InterruptedException {
+
this.conf = peer.getConfiguration();
- this.desiredNum = conf.getInt("desired.num.of.tasks", 1);
+ this.desiredNum = conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
+
this.fs = FileSystem.get(conf);
- Path inputDir = new Path(conf.get("bsp.input.dir"));
+ Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR));
if (fs.isFile(inputDir)) {
inputDir = inputDir.getParent();
}
- if(conf.get("bsp.partitioning.dir") != null) {
- this.partitionDir = new Path(conf.get("bsp.partitioning.dir"));
- } else {
+ converter = ReflectionUtils.newInstance(conf.getClass(
+ Constants.RUNTIME_PARTITION_RECORDCONVERTER,
+ DefaultRecordConverter.class, RecordConverter.class), conf);
+
+ if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
this.partitionDir = new Path(inputDir + "/partitions");
+ } else {
+ this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
+ }
+
+ }
+
+ /**
+ * This record converter could be used to convert the records from the input
+ * format type to the sequential record types the BSP Job uses for
+ * computation.
+ *
+ */
+ public static interface RecordConverter {
+
+ /**
+ * Should return the Key-Value pair constructed from the input format.
+ *
+ * @param inputRecord The input key-value pair.
+ * @param conf Configuration of the job.
+ * @return the Key-Value pair instance of the expected sequential format.
+ * Should return null if the conversion was not successful.
+ */
+ public KeyValuePair<Writable, Writable> convertRecord(
+ KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
+
+ public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks);
+ }
+
+ /**
+ * The default converter does no conversion.
+ */
+ public static class DefaultRecordConverter implements RecordConverter {
+
+ @Override
+ public KeyValuePair<Writable, Writable> convertRecord(
+ KeyValuePair<Writable, Writable> inputRecord, Configuration conf) {
+ return inputRecord;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks) {
+ return Math.abs(partitioner.getPartition(outputRecord.getKey(),
+ outputRecord.getValue(), numTasks));
}
}
@@ -69,22 +127,36 @@ public class PartitioningRunner extends
throws IOException, SyncException, InterruptedException {
Partitioner partitioner = getPartitioner();
KeyValuePair<Writable, Writable> pair = null;
+ KeyValuePair<Writable, Writable> outputPair = null;
Class keyClass = null;
Class valueClass = null;
+ Class outputKeyClass = null;
+ Class outputValueClass = null;
while ((pair = peer.readNext()) != null) {
if (keyClass == null && valueClass == null) {
keyClass = pair.getKey().getClass();
valueClass = pair.getValue().getClass();
}
- int index = Math.abs(partitioner.getPartition(pair.getKey(),
- pair.getValue(), desiredNum));
+ outputPair = converter.convertRecord(pair, conf);
+
+ if (outputPair == null) {
+ continue;
+ }
+
+ if (outputKeyClass == null && outputValueClass == null) {
+ outputKeyClass = outputPair.getKey().getClass();
+ outputValueClass = outputPair.getValue().getClass();
+ }
+
+ int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
+ desiredNum);
if (!values.containsKey(index)) {
values.put(index, new HashMap<Writable, Writable>());
}
- values.get(index).put(pair.getKey(), pair.getValue());
+ values.get(index).put(outputPair.getKey(), outputPair.getValue());
}
// The reason of use of Memory is to reduce file opens
@@ -92,7 +164,8 @@ public class PartitioningRunner extends
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ peer.getPeerIndex());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, keyClass, valueClass, CompressionType.NONE);
+ destFile, outputKeyClass, outputValueClass, CompressionType.NONE);
+
for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
writer.append(v.getKey(), v.getValue());
}
@@ -102,28 +175,31 @@ public class PartitioningRunner extends
peer.sync();
// merge files into one.
- // TODO if we use header info, we might able to merge files without full scan.
+ // TODO if we use header info, we might able to merge files without full
+ // scan.
FileStatus[] status = fs.listStatus(partitionDir);
for (int j = 0; j < status.length; j++) {
int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
int assignedID = idx / (desiredNum / peer.getNumPeers());
if (assignedID == peer.getNumPeers())
assignedID = assignedID - 1;
-
- // TODO set replica factor to 1.
+
+ // TODO set replica factor to 1.
// TODO and check whether we can write to specific DataNode.
if (assignedID == peer.getPeerIndex()) {
FileStatus[] files = fs.listStatus(status[j].getPath());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- new Path(partitionDir + "/" + getPartitionName(j)), keyClass,
- valueClass, CompressionType.NONE);
+ new Path(partitionDir + "/" + getPartitionName(j)), outputKeyClass,
+ outputValueClass, CompressionType.NONE);
for (int i = 0; i < files.length; i++) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
files[i].getPath(), conf);
- Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
- Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+ Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+ conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(
+ outputValueClass, conf);
while (reader.next(key, value)) {
writer.append(key, value);
@@ -139,9 +215,9 @@ public class PartitioningRunner extends
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return ReflectionUtils.newInstance(conf
- .getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
- Partitioner.class), conf);
+ return ReflectionUtils.newInstance(conf.getClass(
+ Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
+ Partitioner.class), conf);
}
private static String getPartitionName(int i) {
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.examples;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -159,6 +161,41 @@ public final class BipartiteMatching {
return !getValue().getFirst().equals(UNMATCHED);
}
+ @Override
+ public void readState(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ reusableMessage = new TextPair();
+ reusableMessage.readFields(in);
+ }
+
+ }
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {
+ if (reusableMessage == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ reusableMessage.write(out);
+ }
+
+ }
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public NullWritable createEdgeCostObject() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public TextPair createVertexValue() {
+ return new TextPair();
+ }
+
}
/**
@@ -199,16 +236,9 @@ public final class BipartiteMatching {
System.exit(-1);
}
- public static void main(String... args) throws IOException,
- InterruptedException, ClassNotFoundException {
-
- if (args.length < 2) {
- printUsage();
- }
-
- HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ public static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException{
GraphJob job = new GraphJob(conf, BipartiteMatching.class);
-
+
// set the defaults
job.setMaxIteration(30);
job.setNumBspTask(2);
@@ -230,14 +260,26 @@ public final class BipartiteMatching {
job.setVertexValueClass(TextPair.class);
job.setEdgeValueClass(NullWritable.class);
- job.setInputKeyClass(LongWritable.class);
- job.setInputValueClass(Text.class);
job.setInputFormat(TextInputFormat.class);
job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
job.setPartitioner(HashPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TextPair.class);
+ return job;
+ }
+
+
+ public static void main(String... args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ if (args.length < 2) {
+ printUsage();
+ }
+
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+
+ GraphJob job = createJob(args, conf);
long startTime = System.currentTimeMillis();
if (job.waitForCompletion(true)) {
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.examples;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
@@ -94,4 +96,27 @@ public class InlinkCount extends Vertex<
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
+
+ @Override
+ public void readState(DataInput in) throws IOException {}
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {}
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public NullWritable createEdgeCostObject() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public IntWritable createVertexValue() {
+ return new IntWritable();
+ }
+
+
}
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.examples;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
@@ -79,6 +81,29 @@ public class MindistSearch {
}
}
}
+
+ @Override
+ public void readState(DataInput in) throws IOException {}
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {}
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public NullWritable createEdgeCostObject() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Text createVertexValue() {
+ return new Text();
+ }
+
+
}
public static class MinTextCombiner extends Combiner<Text> {
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sun Jan 13 20:45:35 2013
@@ -17,29 +17,61 @@
*/
package org.apache.hama.examples;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.SequenceFileInputFormat;
-import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
/**
* Real pagerank with dangling node contribution.
*/
public class PageRank {
+ public static class PagerankTextReader extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+ /**
+ * The text file essentially should look like: <br/>
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
+ * E.G:<br/>
+ * 1\t2\t3\t4<br/>
+ * 2\t3\t1<br/>
+ * etc.
+ */
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+ String[] split = value.toString().split("\t");
+ for (int i = 0; i < split.length; i++) {
+ if (i == 0) {
+ vertex.setVertexID(new Text(split[i]));
+ } else {
+ vertex
+ .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+ }
+ }
+ return true;
+ }
+
+ }
+
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
@@ -95,6 +127,30 @@ public class PageRank {
sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
/ numEdges));
}
+
+ @Override
+ public void readState(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public NullWritable createEdgeCostObject() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DoubleWritable createVertexValue() {
+ return new DoubleWritable();
+ }
+
}
private static void printUsage() {
@@ -109,6 +165,7 @@ public class PageRank {
HamaConfiguration conf = new HamaConfiguration(new Configuration());
GraphJob pageJob = createJob(args, conf);
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
long startTime = System.currentTimeMillis();
if (pageJob.waitForCompletion(true)) {
@@ -145,8 +202,6 @@ public class PageRank {
pageJob.setEdgeValueClass(NullWritable.class);
pageJob.setInputFormat(SequenceFileInputFormat.class);
- pageJob.setInputKeyClass(Text.class);
- pageJob.setInputValueClass(TextArrayWritable.class);
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(TextOutputFormat.class);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.examples;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
@@ -70,6 +72,29 @@ public class SSSP {
voteToHalt();
}
}
+
+ @Override
+ public void readState(DataInput in) throws IOException {}
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {}
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public IntWritable createEdgeCostObject() {
+ return new IntWritable();
+ }
+
+ @Override
+ public IntWritable createVertexValue() {
+ return new IntWritable();
+ }
+
+
}
public static class MinIntCombiner extends Combiner<IntWritable> {
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java Sun Jan 13 20:45:35 2013
@@ -151,6 +151,8 @@ public class SpMV {
* Output is pairs of integer and double
*/
bsp.setInputFormat(SequenceFileInputFormat.class);
+ bsp.setInputKeyClass(IntWritable.class);
+ bsp.setInputValueClass(SparseVectorWritable.class);
bsp.setOutputKeyClass(IntWritable.class);
bsp.setOutputValueClass(DoubleWritable.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Sun Jan 13 20:45:35 2013
@@ -28,36 +28,34 @@ import com.google.common.base.Objects;
/**
* TextPair class for use in BipartiteMatching algorithm.
- *
+ *
*/
-public final class TextPair implements Writable{
-
+public final class TextPair implements Writable {
+
Text first;
Text second;
-
+
String nameFirst = "First";
String nameSecond = "Second";
-
- public TextPair(){
- first = new Text();
- second = new Text();
- }
-
- public TextPair(Text first, Text second){
- this.first = first;
+
+ public TextPair() {
+ first = new Text();
+ second = new Text();
+ }
+
+ public TextPair(Text first, Text second) {
+ this.first = first;
this.second = second;
}
-
+
/**
- * Sets the names of the attributes
+ * Sets the names of the attributes
*/
- public TextPair setNames(String nameFirst, String nameSecond){
+ public TextPair setNames(String nameFirst, String nameSecond) {
this.nameFirst = nameFirst;
this.nameSecond = nameSecond;
return this;
}
-
-
public Text getFirst() {
return first;
@@ -77,23 +75,29 @@ public final class TextPair implements W
@Override
public void write(DataOutput out) throws IOException {
+ (new Text(nameFirst)).write(out);
+ (new Text(nameSecond)).write(out);
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
+
+ Text t1 = new Text();
+ Text t2 = new Text();
+ t1.readFields(in);
+ t2.readFields(in);
+ nameFirst = t1.toString();
+ nameSecond = t2.toString();
first.readFields(in);
second.readFields(in);
}
-
+
@Override
- public String toString(){
- return Objects.toStringHelper(this)
- .add(nameFirst, getFirst())
- .add(nameSecond, getSecond())
- .toString();
+ public String toString() {
+ return Objects.toStringHelper(this).add(nameFirst, getFirst())
+ .add(nameSecond, getSecond()).toString();
}
-
}
Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java?rev=1432733&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java Sun Jan 13 20:45:35 2013
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.NullInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.examples.CombineExample;
+import org.apache.hama.examples.PageRank.PageRankVertex;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.util.ReflectionUtils;
+
+public class VertexInputGen {
+
+ public static final String SIZE_OF_MATRIX = "size.of.matrix";
+ public static final String DENSITY = "density.of.matrix";
+
+ public static interface VertexCreator {
+ @SuppressWarnings("rawtypes")
+ Vertex createVertex(Text id, Text[] edges, Text value);
+ }
+
+ public static class PageRankVertexCreatorImpl implements VertexCreator {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public Vertex createVertex(Text id, Text[] edges, Text value) {
+ Vertex v = new PageRankVertex();
+ v.setVertexID(id);
+ for (Text t : edges) {
+ v.addEdge(new Edge<Text, NullWritable>(t, null));
+ }
+ return v;
+ }
+
+ }
+
+ public static int getVertexCaseId(Class<? extends Vertex> classObj) {
+ if (classObj.getCanonicalName().equals(
+ PageRankVertexCreatorImpl.class.getCanonicalName())) {
+ return 1;
+ }
+
+ return -1;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static class VertexInputGenBSP extends
+ BSP<NullWritable, NullWritable, Vertex, NullWritable, Text> {
+
+ private Configuration conf;
+ private int sizeN;
+ private int density;
+ private Map<Integer, HashSet<Integer>> list = new HashMap<Integer, HashSet<Integer>>();
+ private VertexCreator vertexCreator;
+
+ @Override
+ public void setup(
+ BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer) {
+ this.conf = peer.getConfiguration();
+ sizeN = conf.getInt(SIZE_OF_MATRIX, 10);
+ density = conf.getInt(DENSITY, 1);
+
+ int vertexCase = conf.getInt("hama.test.vertexcreatorid", -1);
+ if (vertexCase == 1) {
+ vertexCreator = new PageRankVertexCreatorImpl();
+ } else {
+ throw new RuntimeException("No vertex creator specified");
+ }
+
+ }
+
+ @Override
+ public void bsp(
+ BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer)
+ throws IOException, SyncException, InterruptedException {
+ int interval = sizeN / peer.getNumPeers();
+ int startID = peer.getPeerIndex() * interval;
+ int endID;
+ if (peer.getPeerIndex() == peer.getNumPeers() - 1)
+ endID = sizeN;
+ else
+ endID = startID + interval;
+
+ // Generate N*(N+1) elements for lower triangular
+ for (int i = startID; i < endID; i++) {
+ HashSet<Integer> edges = new HashSet<Integer>();
+ for (int j = 0; j <= i; j++) {
+ boolean nonZero = new Random().nextInt(density) == 0;
+ if (nonZero && !edges.contains(j) && i != j) {
+ edges.add(j);
+
+ // TODO please refactor this.
+ int peerIndex = j / interval;
+ if (peerIndex == peer.getNumPeers())
+ peerIndex = peerIndex - 1;
+
+ peer.send(peer.getPeerName(j / interval), new Text(j + "," + i));
+ }
+ }
+
+ list.put(i, edges);
+ }
+
+ // Synchronize the upper and lower
+ peer.sync();
+ Text received;
+ while ((received = peer.getCurrentMessage()) != null) {
+ String[] kv = received.toString().split(",");
+ HashSet<Integer> nList = list.get(Integer.parseInt(kv[0]));
+ nList.add(Integer.parseInt(kv[1]));
+ list.put(Integer.parseInt(kv[0]), nList);
+ }
+ }
+
+ @Override
+ public void cleanup(
+ BSPPeer<NullWritable, NullWritable, Vertex, NullWritable, Text> peer)
+ throws IOException {
+ for (Map.Entry<Integer, HashSet<Integer>> e : list.entrySet()) {
+ Text[] values = new Text[e.getValue().size()];
+ if (values.length > 0) {
+ int i = 0;
+ for (Integer v : e.getValue()) {
+ values[i] = new Text(String.valueOf(v));
+ i++;
+ }
+ peer.write(
+ (Vertex)this.vertexCreator.createVertex(
+ new Text(String.valueOf(e.getKey())), values, new Text()),
+ NullWritable.get());
+ }
+ }
+ }
+ }
+
+ public static void runJob(HamaConfiguration conf, int numTasks, String output, Class<? extends Vertex> cls)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ BSPJob bsp = new BSPJob(conf, VertexInputGen.class);
+ // Set the job name
+ bsp.setJobName("Random Vertex Input Generator");
+ bsp.setBspClass(VertexInputGenBSP.class);
+ bsp.setInputFormat(NullInputFormat.class);
+ bsp.setOutputKeyClass(cls);
+ bsp.setOutputValueClass(NullWritable.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(bsp, new Path(output));
+ bsp.setNumBspTask(numTasks);
+
+ long startTime = System.currentTimeMillis();
+ if (bsp.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException,
+ IOException, ClassNotFoundException {
+ if (args.length < 4) {
+ System.out
+ .println("Usage: <size n> <1/x density> <output path> <number of tasks>");
+ System.exit(1);
+ }
+
+ // BSP job configuration
+ HamaConfiguration conf = new HamaConfiguration();
+
+ conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0]));
+ conf.setInt(DENSITY, Integer.parseInt(args[1]));
+ runJob(conf, Integer.parseInt(args[3]), args[2], Vertex.class);
+
+ }
+
+}
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Sun Jan 13 20:45:35 2013
@@ -26,6 +26,8 @@ import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -33,30 +35,36 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.examples.util.TextPair;
+import org.apache.hama.graph.GraphJob;
import org.junit.Test;
-import junit.framework.TestCase;
+public class BipartiteMatchingTest extends TestCase {
-public class BipartiteMatchingTest extends TestCase{
-
- private String[] input = {
- "A L:B D",
- "B R:A C",
- "C L:B D",
- "D R:A C"
- };
+ private String[] input = { "A L:B D", "B R:A C", "C L:B D", "D R:A C" };
private final static String DELIMETER = "\t";
@SuppressWarnings("serial")
- private Map<String, String> output1 = new HashMap<String, String>()
- {{
- put("C", "TextPair{MatchVertex=D, Component=L}");
- put("A", "TextPair{MatchVertex=B, Component=L}");
- put("D", "TextPair{MatchVertex=C, Component=R}");
- put("B", "TextPair{MatchVertex=A, Component=R}");
- }};
+ private Map<String, String> output1 = new HashMap<String, String>() {
+ {
+ put("C", "TextPair{MatchVertex=D, Component=L}");
+ put("A", "TextPair{MatchVertex=B, Component=L}");
+ put("D", "TextPair{MatchVertex=C, Component=R}");
+ put("B", "TextPair{MatchVertex=A, Component=R}");
+ }
+ };
+ public static class CustomTextPartitioner implements
+ Partitioner<Text, TextPair> {
+
+ @Override
+ public int getPartition(Text key, TextPair value, int numTasks) {
+ return Character.getNumericValue(key.toString().charAt(0)) % numTasks;
+ }
+
+ }
private static String INPUT = "/tmp/graph.txt";
private static String OUTPUT = "/tmp/graph-bipartite";
@@ -70,57 +78,63 @@ public class BipartiteMatchingTest exten
fs = FileSystem.get(conf);
}
- private void generateTestData(){
+ private void generateTestData() {
FileWriter fout = null;
BufferedWriter bout = null;
PrintWriter pout = null;
- try{
+ try {
fout = new FileWriter(INPUT);
bout = new BufferedWriter(fout);
pout = new PrintWriter(bout);
- for(String line:input){
+ for (String line : input) {
pout.println(line);
}
- }
- catch(IOException e){
+ } catch (IOException e) {
e.printStackTrace();
- }
- finally{
+ } finally {
try {
- if(pout!=null){pout.close();}
- if(bout!=null){bout.close();}
- if(fout!=null){fout.close();}
+ if (pout != null) {
+ pout.close();
+ }
+ if (bout != null) {
+ bout.close();
+ }
+ if (fout != null) {
+ fout.close();
+ }
} catch (IOException e) {
e.printStackTrace();
- }
+ }
}
}
-
- private void verifyResult()throws IOException{
+ private void verifyResult() throws IOException {
FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
+ assertTrue(files.length == 2);
+
Text key = new Text();
Text value = new Text();
- for(FileStatus file:files){
- if(file.getLen() > 0){
- FSDataInputStream in = fs.open(file.getPath());
- BufferedReader bin = new BufferedReader(
- new InputStreamReader(in));
+
+ for (FileStatus file : files) {
+ if (file.getLen() > 0) {
+ FSDataInputStream in = fs.open(file.getPath());
+ BufferedReader bin = new BufferedReader(new InputStreamReader(in));
String s = bin.readLine();
- while(s!=null){
+ while (s != null) {
next(key, value, s);
String expValue = output1.get(key.toString());
+ System.out.println(key + " " + value + " expvalue = " + expValue);
assertEquals(expValue, value.toString());
- System.out.println(key + " "+value);
+
s = bin.readLine();
- }
+ }
in.close();
}
}
}
- private static void next(Text key, Text value, String line){
+ private static void next(Text key, Text value, String line) {
String[] lineA = line.split(DELIMETER);
key.set(lineA[0]);
value.set(lineA[1]);
@@ -139,17 +153,24 @@ public class BipartiteMatchingTest exten
@Test
public void testBipartiteMatching() throws IOException, InterruptedException,
- ClassNotFoundException{
+ ClassNotFoundException {
generateTestData();
try {
String seed = "2";
- BipartiteMatching.main(new String[] { INPUT, OUTPUT, "30", "2",
- seed});
+ HamaConfiguration conf = new HamaConfiguration();
+ GraphJob job = BipartiteMatching.createJob(new String[] { INPUT, OUTPUT,
+ "30", "2", seed }, conf);
+ job.setPartitioner(CustomTextPartitioner.class);
+
+ long startTime = System.currentTimeMillis();
+ if (job.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+
verifyResult();
} finally {
deleteTempDirs();
}
}
-
-
}
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sun Jan 13 20:45:35 2013
@@ -29,12 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.examples.MindistSearch.MinTextCombiner;
+import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
+import org.apache.hama.graph.Edge;
public class MindistSearchTest extends TestCase {
@@ -98,18 +99,18 @@ public class MindistSearchTest extends T
private void generateTestData() {
try {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- new Path(INPUT), Text.class, TextArrayWritable.class);
+ new Path(INPUT), MindistSearchVertex.class, NullWritable.class);
for (int i = 0; i < input.length; i++) {
String[] x = input[i].split("\t");
Text key = new Text(x[0]);
- Writable[] values = new Writable[x.length - 1];
+ MindistSearchVertex vertex = new MindistSearchVertex();
+ vertex.setVertexID(key);
for (int j = 1; j < x.length; j++) {
- values[j - 1] = new Text(x[j]);
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+ NullWritable.get()));
}
- TextArrayWritable value = new TextArrayWritable();
- value.set(values);
- writer.append(key, value);
+ writer.append(vertex, NullWritable.get());
}
writer.close();
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sun Jan 13 20:45:35 2013
@@ -28,14 +28,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.examples.util.SymmetricMatrixGen;
+import org.apache.hama.examples.PageRank.PageRankVertex;
+import org.apache.hama.examples.util.VertexInputGen;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner;
public class PageRankTest extends TestCase {
+
private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
+
private static String OUTPUT = "/tmp/pagerank/pagerank-out";
private Configuration conf = new HamaConfiguration();
private FileSystem fs;
@@ -70,8 +73,8 @@ public class PageRankTest extends TestCa
conf.set("bsp.local.tasks.maximum", "10");
conf.set("bsp.peers.num", "7");
conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
- GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7" },
- conf);
+ GraphJob pageJob = PageRank.createJob(
+ new String[] { INPUT, OUTPUT, "7" }, conf);
if (!pageJob.waitForCompletion(true)) {
fail("Job did not complete normally!");
@@ -84,7 +87,11 @@ public class PageRankTest extends TestCa
private void generateTestData() throws ClassNotFoundException,
InterruptedException, IOException {
- SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" });
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.setInt(VertexInputGen.SIZE_OF_MATRIX, 40);
+ conf.setInt(VertexInputGen.DENSITY, 10);
+ conf.setInt("hama.test.vertexcreatorid", 1);
+ VertexInputGen.runJob(conf, 3, INPUT, PageRankVertex.class);
}
private void deleteTempDirs() {
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Sun Jan 13 20:45:35 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.Writable;
* The edge class
*/
public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
-
private final VERTEX_ID destinationVertexID;
private final EDGE_VALUE_TYPE cost;
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sun Jan 13 20:45:35 2013
@@ -20,13 +20,16 @@ package org.apache.hama.graph;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import com.google.common.base.Preconditions;
@@ -39,9 +42,6 @@ public class GraphJob extends BSPJob {
public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
- public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
- public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class";
-
/**
* Creates a new Graph Job with the given configuration and an exampleClass.
* The exampleClass is used to determine the user's jar to distribute in the
@@ -67,6 +67,8 @@ public class GraphJob extends BSPJob {
Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
throws IllegalStateException {
conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
+ setInputKeyClass(cls);
+ setInputValueClass(NullWritable.class);
}
/**
@@ -119,7 +121,9 @@ public class GraphJob extends BSPJob {
public void setVertexInputReaderClass(
Class<? extends VertexInputReader<?, ?, ?, ?, ?>> cls) {
ensureState(JobState.DEFINE);
- conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class);
+ conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
+ RecordConverter.class);
+ conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
@SuppressWarnings("unchecked")
@@ -132,7 +136,7 @@ public class GraphJob extends BSPJob {
public void setPartitioner(@SuppressWarnings("rawtypes")
Class<? extends Partitioner> theClass) {
super.setPartitioner(theClass);
- conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
+ conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
@Override
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun Jan 13 20:45:35 2013
@@ -28,7 +28,6 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -66,13 +65,13 @@ public final class GraphJobRunner<V exte
public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
public static final String GRAPH_REPAIR = "hama.graph.repair";
+ public static final String VERTEX_CLASS = "hama.graph.vertex.class";
private Configuration conf;
private Combiner<M> combiner;
private Partitioner<V, M> partitioner;
- private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>();
-
+ private VerticesInfo<V, E, M> vertices;
private boolean updated = true;
private int globalUpdateCounts = 0;
@@ -264,10 +263,12 @@ public final class GraphJobRunner<V exte
aggregationRunner = new AggregationRunner<V, E, M>();
aggregationRunner.setupAggregators(peer);
+
+ vertices = new VerticesInfo<V, E, M>();
}
/**
- * Loads vertices into memory of each peer. TODO this needs to be simplified.
+ * Loads vertices into memory of each peer.
*/
@SuppressWarnings("unchecked")
private void loadVertices(
@@ -277,41 +278,22 @@ public final class GraphJobRunner<V exte
final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
- LOG.debug("vertex class: " + vertexClass);
- Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
- vertex.runner = this;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Vertex class: " + vertexClass);
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
- V key = (V) next.getKey();
- Writable[] edges = ((ArrayWritable) next.getValue()).get();
- vertex.setVertexID(key);
- List<Edge<V, E>> edgeList = new ArrayList<Edge<V, E>>();
- for (Writable edge : edges) {
- edgeList.add(new Edge<V, E>((V) edge, null));
- }
- vertex.setEdges(edgeList);
-
- if (vertex.getEdges() == null) {
- if (selfReference) {
- vertex.setEdges(Collections.singletonList(new Edge<V, E>(vertex
- .getVertexID(), null)));
- } else {
- vertex.setEdges(Collections.EMPTY_LIST);
- }
- }
-
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) next.getKey();
+ vertex.runner = this;
+ vertex.setup(conf);
+ vertices.addVertex(vertex);
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
-
- vertex.setup(conf);
- vertices.add(vertex);
- vertex = newVertexInstance(vertexClass, conf);
- vertex.runner = this;
}
- LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
+ if (LOG.isDebugEnabled())
+ LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
/*
* If the user want to repair the graph, it should traverse through that
@@ -321,18 +303,20 @@ public final class GraphJobRunner<V exte
* procedure is to prevent NullPointerExceptions from happening.
*/
if (repairNeeded) {
- LOG.debug("Starting repair of this graph!");
+ if (LOG.isDebugEnabled())
+ LOG.debug("Starting repair of this graph!");
repair(peer, selfReference);
}
- LOG.debug("Starting Vertex processing!");
+ if (LOG.isDebugEnabled())
+ LOG.debug("Starting Vertex processing!");
}
@SuppressWarnings("unchecked")
private void repair(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- boolean selfReference) throws IOException,
- SyncException, InterruptedException {
+ boolean selfReference) throws IOException, SyncException,
+ InterruptedException {
Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
@@ -368,7 +352,9 @@ public final class GraphJobRunner<V exte
}
}
- vertices.addAll(tmp.values());
+ for (Vertex<V, E, M> v : tmp.values()) {
+ vertices.addVertex(v);
+ }
tmp.clear();
}
@@ -533,4 +519,3 @@ public final class GraphJobRunner<V exte
}
}
-
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sun Jan 13 20:45:35 2013
@@ -17,8 +17,10 @@
*/
package org.apache.hama.graph;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,8 +29,24 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Partitioner;
+/**
+ * Vertex is a abstract definition of Google Pregel Vertex. For implementing a
+ * graph application, one must implement a sub-class of Vertex and define, the
+ * message passing and message processing for each vertex.
+ *
+ * Every vertex should be assigned an ID. This ID object should obey the
+ * equals-hashcode contract and would be used for partitioning.
+ *
+ * The edges for a vertex could be accessed and modified using the
+ * {@link Vertex#getEdges()} call. The self value of the vertex could be changed
+ * by {@link Vertex#setValue(Writable)}.
+ *
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
- implements VertexInterface<V, E, M> {
+ implements VertexInterface<V, E, M>, Writable {
GraphJobRunner<?, ?, ?> runner;
@@ -74,7 +92,7 @@ public abstract class Vertex<V extends W
getPartitioner().getPartition(vertexId, value,
runner.getPeer().getNumPeers()));
}
-
+
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
@@ -103,7 +121,7 @@ public abstract class Vertex<V extends W
public void addEdge(Edge<V, E> edge) {
if (edges == null) {
- this.edges = new ArrayList<Edge<V, E>>(1);
+ this.edges = new LinkedList<Edge<V, E>>();
}
this.edges.add(edge);
}
@@ -195,10 +213,7 @@ public abstract class Vertex<V extends W
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((vertexID == null) ? 0 : vertexID.hashCode());
- return result;
+ return ((vertexID == null) ? 0 : vertexID.hashCode());
}
@Override
@@ -224,4 +239,120 @@ public abstract class Vertex<V extends W
+ " // " + edges;
}
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ if (vertexID == null) {
+ vertexID = createVertexIDObject();
+ }
+ vertexID.readFields(in);
+ }
+ if (in.readBoolean()) {
+ if (this.value == null) {
+ value = createVertexValue();
+ }
+ value.readFields(in);
+ }
+ this.edges = new LinkedList<Edge<V, E>>();
+ if (in.readBoolean()) {
+ int num = in.readInt();
+ if (num > 0) {
+ for (int i = 0; i < num; ++i) {
+ V vertex = createVertexIDObject();
+ vertex.readFields(in);
+ E edgeCost = null;
+ if (in.readBoolean()) {
+ edgeCost = this.createEdgeCostObject();
+ edgeCost.readFields(in);
+ }
+ Edge<V, E> edge = new Edge<V, E>(vertex, edgeCost);
+ this.edges.add(edge);
+ }
+
+ }
+ }
+ votedToHalt = in.readBoolean();
+ readState(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (vertexID == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ vertexID.write(out);
+ }
+ if (value == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ value.write(out);
+ }
+ if (this.edges == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(this.edges.size());
+ for (Edge<V, E> edge : this.edges) {
+ edge.getDestinationVertexID().write(out);
+ if (edge.getValue() == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ edge.getValue().write(out);
+ }
+ }
+ }
+ out.writeBoolean(votedToHalt);
+ writeState(out);
+
+ }
+
+ /**
+ * Create the vertex id object. This function is used by the framework to
+ * construct the vertex id object.
+ *
+ * @return instance of V
+ */
+ public abstract V createVertexIDObject();
+
+ /**
+ * Create the Edge cost object. This function is used by the framework to
+ * construct the edge cost object
+ *
+ * @return instance of E
+ */
+ public abstract E createEdgeCostObject();
+
+ /**
+ * Create the vertex value object. This function is used by the framework to
+ * construct the vertex value object.
+ *
+ * @return
+ */
+ public abstract M createVertexValue();
+
+ /**
+ * Read the state of the vertex from the input stream. The framework would
+ * have already constructed and loaded the vertex-id, edges and voteToHalt
+ * state. This function is essential if there is any more properties of vertex
+ * to be read from.
+ *
+ * @param in
+ * @throws IOException
+ */
+ public abstract void readState(DataInput in) throws IOException;
+
+ /**
+ * Writes the state of vertex to the output stream. The framework writes the
+ * vertex and edge information to the output stream. This function could be
+ * used to save the state variable of the vertex added in the implementation
+ * of object.
+ *
+ * @param out
+ * @throws IOException
+ */
+ public abstract void writeState(DataOutput out) throws IOException;
+
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Sun Jan 13 20:45:35 2013
@@ -17,12 +17,25 @@
*/
package org.apache.hama.graph;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
+import org.apache.hama.util.KeyValuePair;
/**
* A reader to read Hama's input files and parses a vertex out of it.
*/
-public abstract class VertexInputReader<KEYIN extends Writable, VALUEIN extends Writable, V extends Writable, E extends Writable, M extends Writable> {
+public abstract class VertexInputReader<KEYIN extends Writable, VALUEIN extends Writable, V extends Writable, E extends Writable, M extends Writable>
+ implements RecordConverter {
+
+ private static final Log LOG = LogFactory.getLog(VertexInputReader.class);
+
+ private KeyValuePair<Writable, Writable> outputRecord = new KeyValuePair<Writable, Writable>();
/**
* Parses a given key and value into the given vertex. If returned true, the
@@ -32,4 +45,40 @@ public abstract class VertexInputReader<
public abstract boolean parseVertex(KEYIN key, VALUEIN value,
Vertex<V, E, M> vertex) throws Exception;
+ @SuppressWarnings("unchecked")
+ @Override
+ public KeyValuePair<Writable, Writable> convertRecord(
+ KeyValuePair<Writable, Writable> inputRecord, Configuration conf) {
+ Class<Vertex<V, E, M>> vertexClass = (Class<Vertex<V, E, M>>) conf
+ .getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class);
+ boolean vertexCreation = true;
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .newVertexInstance(vertexClass, conf);
+ try {
+ vertexCreation = parseVertex((KEYIN) inputRecord.getKey(),
+ (VALUEIN) inputRecord.getValue(), vertex);
+ } catch (Exception e) {
+ LOG.error("Error parsing vertex.", e);
+ vertexCreation = false;
+ }
+ if (!vertexCreation) {
+ return null;
+ }
+ outputRecord.setKey(vertex);
+ outputRecord.setValue(NullWritable.get());
+ return outputRecord;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks) {
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getKey();
+ return Math.abs(partitioner.getPartition(vertex.getVertexID(),
+ vertex.getValue(), numTasks));
+ }
+
}
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1432733&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Sun Jan 13 20:45:35 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VerticesInfo encapsulates the storage of vertices in a BSP Task.
+ *
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public class VerticesInfo<V extends Writable, E extends Writable, M extends Writable>
+ implements Iterable<Vertex<V, E, M>> {
+
+ private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>(100);
+
+ public void addVertex(Vertex<V, E, M> vertex) {
+ int i = 0;
+ for (Vertex<V, E, M> check : this) {
+ if (check.getVertexID().equals(vertex.getVertexID())) {
+ this.vertices.set(i, vertex);
+ return;
+ }
+ ++i;
+ }
+ vertices.add(vertex);
+ }
+
+ public Vertex<V, E, M> getVertex(V vertexId) {
+ for (Vertex<V, E, M> vertex : this) {
+ if (vertex.getVertexID().equals(vertexId)) {
+ return vertex;
+ }
+ }
+ return null;
+ }
+
+ public boolean containsVertex(V vertexId) {
+ for (Vertex<V, E, M> vertex : this) {
+ if (vertex.getVertexID().equals(vertexId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void clear() {
+ vertices.clear();
+ }
+
+ public int size() {
+ return this.vertices.size();
+ }
+
+ @Override
+ public Iterator<Vertex<V, E, M>> iterator() {
+ return vertices.iterator();
+ }
+
+ public void recoverState(DataInput in) {
+
+ }
+
+ public void saveState(DataOutput out) {
+
+ }
+}
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sun Jan 13 20:45:35 2013
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.ClusterStatus;
@@ -33,8 +32,8 @@ import org.apache.hama.bsp.HashPartition
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.graph.example.PageRank;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
@@ -57,8 +56,10 @@ public class TestSubmitGraphJob extends
// Set multi-step partitioning interval to 30 bytes
configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
+ configuration.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+
GraphJob bsp = new GraphJob(configuration, PageRank.class);
- bsp.setInputPath(new Path("/tmp/pagerank"));
+ bsp.setInputPath(new Path("/tmp/pagerank/real-tmp.seq"));
bsp.setOutputPath(new Path(OUTPUT));
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
@@ -75,10 +76,7 @@ public class TestSubmitGraphJob extends
bsp.setAggregatorClass(AverageAggregator.class,
PageRank.DanglingNodeAggregator.class);
- bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
bsp.setInputFormat(SequenceFileInputFormat.class);
- bsp.setInputKeyClass(Text.class);
- bsp.setInputValueClass(TextArrayWritable.class);
bsp.setVertexIDClass(Text.class);
bsp.setVertexValueClass(DoubleWritable.class);
@@ -124,18 +122,18 @@ public class TestSubmitGraphJob extends
private void generateTestData() {
try {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
- new Path(INPUT), Text.class, TextArrayWritable.class);
+ new Path(INPUT), PageRankVertex.class, NullWritable.class);
for (int i = 0; i < input.length; i++) {
String[] x = input[i].split("\t");
- Text key = new Text(x[0]);
- Writable[] values = new Writable[x.length - 1];
+
+ PageRankVertex vertex = new PageRankVertex();
+ vertex.setVertexID(new Text(x[0]));
for (int j = 1; j < x.length; j++) {
- values[j - 1] = new Text(x[j]);
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+ NullWritable.get()));
}
- TextArrayWritable value = new TextArrayWritable();
- value.set(values);
- writer.append(key, value);
+ writer.append(vertex, NullWritable.get());
}
writer.close();
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sun Jan 13 20:45:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph.example;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
@@ -25,6 +27,7 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
@@ -87,10 +90,33 @@ public class PageRank {
sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
/ numEdges));
}
+
+ @Override
+ public Text createVertexIDObject() {
+ return new Text();
+ }
+
+ @Override
+ public NullWritable createEdgeCostObject() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public DoubleWritable createVertexValue() {
+ return new DoubleWritable();
+ }
+
+ @Override
+ public void readState(DataInput in) throws IOException {}
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {}
+
}
public static class PagerankTextReader extends
- VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+ VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable>
+ implements RecordConverter {
/**
* The text file essentially should look like: <br/>