You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/22 07:25:50 UTC
svn commit: r1341306 - in /incubator/hama/trunk:
core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/
examples/src/main/java/org/apache/hama/examples/
examples/src/main/java/org/apache/hama/examples/util/
examples/src/test/java/o...
Author: tjungblut
Date: Tue May 22 05:25:49 2012
New Revision: 1341306
URL: http://svn.apache.org/viewvc?rev=1341306&view=rev
Log:
Move partitioner to job for graph jobs
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue May 22 05:25:49 2012
@@ -90,9 +90,9 @@ public class BSPJob extends BSPJobContex
// /////////////////////////////////////
// Setter for Job Submission
// /////////////////////////////////////
- public void setWorkingDirectory(Path dir) throws IOException {
+ public void setWorkingDirectory(Path pDir) throws IOException {
ensureState(JobState.DEFINE);
- dir = new Path(getWorkingDirectory(), dir);
+ Path dir = new Path(getWorkingDirectory(), pDir);
conf.set(WORKING_DIR, dir.toString());
}
@@ -253,7 +253,7 @@ public class BSPJob extends BSPJobContex
@SuppressWarnings({ "rawtypes" })
public InputFormat getInputFormat() {
- return (InputFormat) ReflectionUtils.newInstance(conf.getClass(
+ return ReflectionUtils.newInstance(conf.getClass(
"bsp.input.format.class", TextInputFormat.class, InputFormat.class),
conf);
}
@@ -380,14 +380,14 @@ public class BSPJob extends BSPJobContex
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return (Partitioner) ReflectionUtils.newInstance(conf
+ return ReflectionUtils.newInstance(conf
.getClass("bsp.input.partitioner.class", HashPartitioner.class,
Partitioner.class), conf);
}
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
- return (OutputFormat) ReflectionUtils.newInstance(conf.getClass(
+ return ReflectionUtils.newInstance(conf.getClass(
"bsp.output.format.class", TextOutputFormat.class, OutputFormat.class),
conf);
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue May 22 05:25:49 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
/**
@@ -140,6 +141,7 @@ public class BSPJobClient extends Config
/**
* Returns immediately whether the whole job is done yet or not.
*/
+ @Override
public synchronized boolean isComplete() throws IOException {
updateStatus();
return (status.getRunState() == JobStatus.SUCCEEDED
@@ -149,11 +151,13 @@ public class BSPJobClient extends Config
/**
* True if job completed successfully.
*/
+ @Override
public synchronized boolean isSuccessful() throws IOException {
updateStatus();
return status.getRunState() == JobStatus.SUCCEEDED;
}
+ @Override
public synchronized long getSuperstepCount() throws IOException {
ensureFreshStatus();
return status.getSuperstepCount();
@@ -162,6 +166,7 @@ public class BSPJobClient extends Config
/**
* Blocks until the job is finished
*/
+ @Override
public void waitForCompletion() throws IOException {
while (!isComplete()) {
try {
@@ -174,6 +179,7 @@ public class BSPJobClient extends Config
/**
* Tells the service to get the state of the current job.
*/
+ @Override
public synchronized int getJobState() throws IOException {
updateStatus();
return status.getRunState();
@@ -187,6 +193,7 @@ public class BSPJobClient extends Config
/**
* Tells the service to terminate the current job.
*/
+ @Override
public synchronized void killJob() throws IOException {
jobSubmitClient.killJob(getID());
}
@@ -210,7 +217,7 @@ public class BSPJobClient extends Config
String masterAdress = conf.get("bsp.master.address");
if (masterAdress != null && !masterAdress.equals("local")) {
this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
- JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+ JobSubmissionProtocol.class, HamaRPCProtocolVersion.versionID,
BSPMaster.getAddress(conf), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
} else {
@@ -277,8 +284,9 @@ public class BSPJobClient extends Config
static Random r = new Random();
- public RunningJob submitJobInternal(BSPJob job, BSPJobID jobId)
+ public RunningJob submitJobInternal(BSPJob pJob, BSPJobID jobId)
throws IOException {
+ BSPJob job = pJob;
job.setJobID(jobId);
Path submitJobDir = new Path(getSystemDir(), "submit_"
@@ -308,7 +316,9 @@ public class BSPJobClient extends Config
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- if (job.getConf().get("bsp.input.partitioner.class") != null) {
+ if (job.getConf().get("bsp.input.partitioner.class") != null
+ && !job.getConf()
+ .getBoolean("hama.graph.runtime.partitioning", false)) {
job = partition(job, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
}
@@ -450,11 +460,11 @@ public class BSPJobClient extends Config
return job;
}
- private boolean isProperSize(int numBspTask, int maxTasks) {
+ private static boolean isProperSize(int numBspTask, int maxTasks) {
return (numBspTask > 1 && numBspTask < maxTasks);
}
- private String getPartitionName(int i) {
+ private static String getPartitionName(int i) {
return "part-" + String.valueOf(100000 + i).substring(1, 6);
}
@@ -499,7 +509,7 @@ public class BSPJobClient extends Config
return codecClass;
}
- private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
+ private static int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
throws IOException {
InputSplit[] splits = job.getInputFormat().getSplits(
job,
@@ -529,7 +539,7 @@ public class BSPJobClient extends Config
private static final int CURRENT_SPLIT_FILE_VERSION = 0;
private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
- private DataOutputStream writeSplitsFileHeader(Configuration conf,
+ private static DataOutputStream writeSplitsFileHeader(Configuration conf,
Path filename, int length) throws IOException {
// write the splits to a file for the bsp master
FileSystem fs = filename.getFileSystem(conf);
@@ -817,7 +827,7 @@ public class BSPJobClient extends Config
/**
* Display usage of the command-line tool and terminate execution
*/
- private void displayUsage(String cmd) {
+ private static void displayUsage(String cmd) {
String prefix = "Usage: hama job ";
String taskStates = "running, completed";
if ("-submit".equals(cmd)) {
@@ -986,6 +996,7 @@ public class BSPJobClient extends Config
return locations;
}
+ @Override
public void readFields(DataInput in) throws IOException {
splitClass = Text.readString(in);
dataLength = in.readLong();
@@ -997,6 +1008,7 @@ public class BSPJobClient extends Config
}
}
+ @Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitClass);
out.writeLong(dataLength);
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue May 22 05:25:49 2012
@@ -76,10 +76,14 @@ public interface BSPPeer<K1, V1, K2, V2,
public String getPeerName();
/**
- * @param index
* @return the name of n-th peer from sorted array by name.
*/
public String getPeerName(int index);
+
+ /**
+ * @return the index of this peer from sorted array by name.
+ */
+ public int getPeerIndex();
/**
* @return the names of all the peers executing tasks from the same job
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue May 22 05:25:49 2012
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -196,6 +197,7 @@ public final class BSPPeerImpl<K1, V1, K
final RecordWriter<K2, V2> finalOut = outWriter;
collector = new OutputCollector<K2, V2>() {
+ @Override
public void collect(K2 key, V2 value) throws IOException {
finalOut.write(key, value);
}
@@ -400,6 +402,7 @@ public final class BSPPeerImpl<K1, V1, K
/**
* @return the string as host:port of this Peer
*/
+ @Override
public final String getPeerName() {
return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
@@ -417,6 +420,13 @@ public final class BSPPeerImpl<K1, V1, K
}
@Override
+ public int getPeerIndex() {
+ initPeerNames();
+ return Arrays
+ .binarySearch(getAllPeerNames(), getPeerName());
+ }
+
+ @Override
public final int getNumPeers() {
initPeerNames();
return allPeers.length;
@@ -448,6 +458,7 @@ public final class BSPPeerImpl<K1, V1, K
/**
* @return the count of current super-step
*/
+ @Override
public final long getSuperstepCount() {
return currentTaskStatus.getSuperstepCount();
}
@@ -457,6 +468,7 @@ public final class BSPPeerImpl<K1, V1, K
*
* @return the conf
*/
+ @Override
public final Configuration getConfiguration() {
return conf;
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue May 22 05:25:49 2012
@@ -49,6 +49,7 @@ public abstract class FileInputFormat<K,
private long minSplitSize = 1;
private static final PathFilter hiddenFileFilter = new PathFilter() {
+ @Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
@@ -71,6 +72,7 @@ public abstract class FileInputFormat<K,
this.filters = filters;
}
+ @Override
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
@@ -90,6 +92,7 @@ public abstract class FileInputFormat<K,
return true;
}
+ @Override
public abstract RecordReader<K, V> getRecordReader(InputSplit split,
BSPJob job) throws IOException;
@@ -174,6 +177,7 @@ public abstract class FileInputFormat<K,
/**
* Splits files returned by {@link #listStatus(JobConf)} when they're too big.
*/
+ @Override
public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
FileStatus[] files = listStatus(job);
@@ -332,8 +336,8 @@ public abstract class FileInputFormat<K,
* @param path {@link Path} to be added to the list of inputs for the
* map-reduce job.
*/
- public static void addInputPath(BSPJob conf, Path path) {
- path = new Path(conf.getWorkingDirectory(), path);
+ public static void addInputPath(BSPJob conf, Path p) {
+ Path path = new Path(conf.getWorkingDirectory(), p);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get("bsp.input.dir");
conf.set("bsp.input.dir", dirs == null ? dirStr : dirs
@@ -395,8 +399,9 @@ public abstract class FileInputFormat<K,
return result;
}
- private void sortInDescendingOrder(List<NodeInfo> mylist) {
+ private static void sortInDescendingOrder(List<NodeInfo> mylist) {
Collections.sort(mylist, new Comparator<NodeInfo>() {
+ @Override
public int compare(NodeInfo obj1, NodeInfo obj2) {
if (obj1 == null || obj2 == null)
@@ -424,8 +429,8 @@ public abstract class FileInputFormat<K,
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset,
- long splitSize, NetworkTopology clusterMap) throws IOException {
-
+ long pSplitSize, NetworkTopology clusterMap) throws IOException {
+ long splitSize = pSplitSize;
int startIndex = getBlockIndex(blkLocations, offset);
long bytesInThisBlock = blkLocations[startIndex].getOffset()
@@ -519,7 +524,7 @@ public abstract class FileInputFormat<K,
return identifyHosts(allTopos.length, racksMap);
}
- private String[] identifyHosts(int replicationFactor,
+ private static String[] identifyHosts(int replicationFactor,
Map<Node, NodeInfo> racksMap) {
String[] retVal = new String[replicationFactor];
@@ -562,7 +567,7 @@ public abstract class FileInputFormat<K,
return retVal;
}
- private String[] fakeRacks(BlockLocation[] blkLocations, int index)
+ private static String[] fakeRacks(BlockLocation[] blkLocations, int index)
throws IOException {
String[] allHosts = blkLocations[index].getHosts();
String[] allTopos = new String[allHosts.length];
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java Tue May 22 05:25:49 2012
@@ -71,10 +71,12 @@ public class FileSplit implements InputS
}
/** The number of bytes in the file to process. */
+ @Override
public long getLength() {
return length;
}
+ @Override
public String toString() {
return file + ":" + start + "+" + length;
}
@@ -83,12 +85,14 @@ public class FileSplit implements InputS
// Writable methods
// //////////////////////////////////////////
+ @Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, file.toString());
out.writeLong(start);
out.writeLong(length);
}
+ @Override
public void readFields(DataInput in) throws IOException {
file = new Path(Text.readString(in));
start = in.readLong();
@@ -96,6 +100,7 @@ public class FileSplit implements InputS
hosts = null;
}
+ @Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[] {};
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue May 22 05:25:49 2012
@@ -250,7 +250,7 @@ class TaskInProgress {
private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
- TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+ TaskStatus ts = taskStatuses.get(taskid);
if ((ts != null) && (!tasksReportedClosed.contains(taskid))
&& (job.getStatus().getRunState() != JobStatus.RUNNING)) {
tasksReportedClosed.add(taskid);
@@ -263,7 +263,7 @@ class TaskInProgress {
public void completed(TaskAttemptID taskid) {
LOG.debug("Task '" + taskid.getTaskID().toString() + "' has completed.");
- TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ TaskStatus status = taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.SUCCEEDED);
activeTasks.remove(taskid);
@@ -282,7 +282,7 @@ class TaskInProgress {
public void terminated(TaskAttemptID taskid) {
LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
- TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ TaskStatus status = taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.FAILED);
activeTasks.remove(taskid);
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Tue May 22 05:25:49 2012
@@ -54,6 +54,8 @@ public abstract class HamaClusterTestCas
this.zooKeeperCluster = new MiniZooKeeperCluster();
int clientPort = this.zooKeeperCluster.startup(testDir);
conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+ conf.set(Constants.GROOM_RPC_HOST, "localhost");
+ assertEquals(conf.get(Constants.GROOM_RPC_HOST), "localhost");
bspCluster = new MiniBSPCluster(this.conf, numOfGroom);
bspCluster.startBSPCluster();
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Tue May 22 05:25:49 2012
@@ -118,8 +118,7 @@ public class CombineExample {
if (bsp.waitForCompletion(true)) {
printOutput(conf);
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Tue May 22 05:25:49 2012
@@ -67,7 +67,7 @@ public class InlinkCount extends Vertex<
inlinkJob.setInputFormat(SequenceFileInputFormat.class);
inlinkJob.setInputKeyClass(VertexWritable.class);
inlinkJob.setInputValueClass(VertexArrayWritable.class);
-
+
inlinkJob.setVertexIDClass(Text.class);
inlinkJob.setVertexValueClass(IntWritable.class);
inlinkJob.setEdgeValueClass(NullWritable.class);
@@ -80,8 +80,7 @@ public class InlinkCount extends Vertex<
long startTime = System.currentTimeMillis();
if (inlinkJob.waitForCompletion(true)) {
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Tue May 22 05:25:49 2012
@@ -133,8 +133,7 @@ public class MindistSearch {
long startTime = System.currentTimeMillis();
if (connectedComponentsJob.waitForCompletion(true)) {
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Tue May 22 05:25:49 2012
@@ -122,7 +122,7 @@ public class PageRank {
pageJob.set("hama.pagerank.alpha", args[2]);
pageJob.setAggregatorClass(AverageAggregator.class);
-
+
pageJob.setVertexIDClass(Text.class);
pageJob.setVertexValueClass(DoubleWritable.class);
pageJob.setEdgeValueClass(NullWritable.class);
@@ -136,8 +136,7 @@ public class PageRank {
long startTime = System.currentTimeMillis();
if (pageJob.waitForCompletion(true)) {
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Tue May 22 05:25:49 2012
@@ -139,8 +139,7 @@ public class PiEstimator {
if (bsp.waitForCompletion(true)) {
printOutput(conf);
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Tue May 22 05:25:49 2012
@@ -112,8 +112,7 @@ public class RandBench {
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Tue May 22 05:25:49 2012
@@ -123,17 +123,15 @@ public class SSSP {
ssspJob.setOutputValueClass(IntWritable.class);
// Iterate until all the nodes have been reached.
ssspJob.setMaxIteration(Integer.MAX_VALUE);
-
+
ssspJob.setVertexIDClass(Text.class);
ssspJob.setVertexValueClass(IntWritable.class);
ssspJob.setEdgeValueClass(IntWritable.class);
-
long startTime = System.currentTimeMillis();
if (ssspJob.waitForCompletion(true)) {
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java Tue May 22 05:25:49 2012
@@ -159,8 +159,7 @@ public class SuperstepPiEstimator {
if (bsp.waitForCompletion(true)) {
printOutput(conf);
System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java Tue May 22 05:25:49 2012
@@ -20,7 +20,7 @@ package org.apache.hama.examples.util;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hama.graph.VertexArrayWritable;
@@ -58,8 +58,8 @@ public class PagerankTextToSeq extends T
VertexWritable key = new VertexWritable(split[0]);
VertexWritable[] v = new VertexWritable[split.length - 1];
for (int i = 1; i < split.length; i++) {
- v[i - 1] = new VertexWritable(new DoubleWritable(0.0),
- new Text(split[i]), Text.class, DoubleWritable.class);
+ v[i - 1] = new VertexWritable(NullWritable.get(), new Text(split[i]),
+ Text.class, NullWritable.class);
}
VertexArrayWritable value = new VertexArrayWritable();
value.set(v);
Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Tue May 22 05:25:49 2012
@@ -158,7 +158,8 @@ public class MindistSearchTest extends T
public void testRepairFunctionality() throws Exception {
// make a copy to be safe with parallel test executions
- final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(tmp);
+ final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(
+ tmp);
// removing 7 should resulting in creating it and getting the same result as
// usual
map.remove(new VertexWritable<Text, IntWritable>("7"));
@@ -183,7 +184,7 @@ public class MindistSearchTest extends T
connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
connectedComponentsJob.setOutputKeyClass(Text.class);
connectedComponentsJob.setOutputValueClass(Text.class);
-
+
connectedComponentsJob.setVertexIDClass(Text.class);
connectedComponentsJob.setVertexValueClass(Text.class);
connectedComponentsJob.setEdgeValueClass(NullWritable.class);
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Tue May 22 05:25:49 2012
@@ -23,12 +23,11 @@ import org.apache.hadoop.io.Writable;
/**
* The edge class
*/
-public class Edge<VERTEX_ID, EDGE_VALUE_TYPE extends Writable> {
+public final class Edge<VERTEX_ID extends Writable, EDGE_VALUE_TYPE extends Writable> {
- private VERTEX_ID destinationVertexID;
- // actually the destination peer address
- private String destinationPeerName;
- private EDGE_VALUE_TYPE cost;
+ private final VERTEX_ID destinationVertexID;
+ private final String destinationPeerName;
+ private final EDGE_VALUE_TYPE cost;
public Edge(VERTEX_ID sourceVertexID, String destVertexID,
EDGE_VALUE_TYPE cost) {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue May 22 05:25:49 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.Partitioner;
public class GraphJob extends BSPJob {
@@ -35,6 +36,7 @@ public class GraphJob extends BSPJob {
public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
+ public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning";
/**
* Creates a new Graph Job with the given configuration and an exampleClass.
@@ -102,6 +104,13 @@ public class GraphJob extends BSPJob {
}
@Override
+ public void setPartitioner(@SuppressWarnings("rawtypes")
+ Class<? extends Partitioner> theClass) {
+ super.setPartitioner(theClass);
+ conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
+ }
+
+ @Override
public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
ensureState(JobState.DEFINE);
conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
@@ -114,4 +123,5 @@ public class GraphJob extends BSPJob {
public void setMaxIteration(int maxIteration) {
conf.setInt("hama.graph.max.iteration", maxIteration);
}
+
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Tue May 22 05:25:49 2012
@@ -20,6 +20,8 @@ package org.apache.hama.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
@@ -35,16 +37,20 @@ public final class GraphJobMessage imple
public static final int MAP_FLAG = 0x01;
public static final int VERTEX_FLAG = 0x02;
public static final int REPAIR_FLAG = 0x04;
+ public static final int PARTITION_FLAG = 0x08;
// staticly defined because it is process-wide information, therefore in caps
// considered as a constant
+ public static Class<?> VERTEX_CLASS;
public static Class<? extends Writable> VERTEX_ID_CLASS;
public static Class<? extends Writable> VERTEX_VALUE_CLASS;
+ public static Class<? extends Writable> EDGE_VALUE_CLASS;
private int flag = MAP_FLAG;
private MapWritable map;
private Writable vertexId;
private Writable vertexValue;
+ private Vertex<?, ?, ?> vertex;
public GraphJobMessage() {
}
@@ -65,6 +71,11 @@ public final class GraphJobMessage imple
this.vertexValue = vertexValue;
}
+ public GraphJobMessage(Vertex<?, ?, ?> vertex) {
+ this.flag = PARTITION_FLAG;
+ this.vertex = vertex;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(this.flag);
@@ -75,6 +86,27 @@ public final class GraphJobMessage imple
vertexValue.write(out);
} else if (isMapMessage()) {
map.write(out);
+ } else if (isPartitioningMessage()) {
+ vertex.getVertexID().write(out);
+ if (vertex.getValue() != null) {
+ out.writeBoolean(true);
+ vertex.getValue().write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ List<?> outEdges = vertex.getOutEdges();
+ out.writeInt(outEdges.size());
+ for (Object e : outEdges) {
+ Edge<?, ?> edge = (Edge<?, ?>) e;
+ out.writeUTF(edge.getDestinationPeerName());
+ edge.getDestinationVertexID().write(out);
+ if (edge.getValue() != null) {
+ out.writeBoolean(true);
+ edge.getValue().write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
} else {
vertexId.write(out);
}
@@ -92,11 +124,38 @@ public final class GraphJobMessage imple
} else if (isMapMessage()) {
map = new MapWritable();
map.readFields(in);
+ } else if (isPartitioningMessage()) {
+ Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
+ .newVertexInstance(VERTEX_CLASS, null);
+ Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
+ vertexId.readFields(in);
+ vertex.setVertexID(vertexId);
+ if (in.readBoolean()) {
+ Writable vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS,
+ null);
+ vertexValue.readFields(in);
+ vertex.setValue(vertexValue);
+ }
+ int size = in.readInt();
+ vertex.edges = new ArrayList<Edge<Writable, Writable>>(size);
+ for (int i = 0; i < size; i++) {
+ String destination = in.readUTF();
+ Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
+ null);
+ edgeVertexID.readFields(in);
+ Writable edgeValue = null;
+ if (in.readBoolean()) {
+ edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null);
+ edgeValue.readFields(in);
+ }
+ vertex.edges.add(new Edge<Writable, Writable>(edgeVertexID,
+ destination, edgeValue));
+ }
+ this.vertex = vertex;
} else {
vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
vertexId.readFields(in);
}
-
}
public MapWritable getMap() {
@@ -111,6 +170,10 @@ public final class GraphJobMessage imple
return vertexValue;
}
+ public Vertex<?, ?, ?> getVertex() {
+ return vertex;
+ }
+
public boolean isMapMessage() {
return flag == MAP_FLAG;
}
@@ -123,10 +186,15 @@ public final class GraphJobMessage imple
return flag == REPAIR_FLAG;
}
+ public boolean isPartitioningMessage() {
+ return flag == PARTITION_FLAG;
+ }
+
@Override
public String toString() {
return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
- + vertexId + ", vertexValue=" + vertexValue + "]";
+ + vertexId + ", vertexValue=" + vertexValue + ", vertex=" + vertex
+ + "]";
}
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue May 22 05:25:49 2012
@@ -39,6 +39,8 @@ import org.apache.hadoop.util.Reflection
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
@@ -53,7 +55,7 @@ public final class GraphJobRunner<VERTEX
extends
BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> {
- private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+ static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
// make sure that these values don't collide with the vertex names
private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
@@ -96,6 +98,7 @@ public final class GraphJobRunner<VERTEX
Class<VERTEX_ID> vertexIdClass;
Class<VERTEX_VALUE> vertexValueClass;
Class<EDGE_VALUE_TYPE> edgeValueClass;
+ Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
@Override
@SuppressWarnings("unchecked")
@@ -114,11 +117,21 @@ public final class GraphJobRunner<VERTEX
edgeValueClass = (Class<EDGE_VALUE_TYPE>) conf.getClass(
GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
Writable.class);
+ vertexClass = (Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>) conf
+ .getClass("hama.graph.vertex.class", Vertex.class);
GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
GraphJobMessage.VERTEX_VALUE_CLASS = vertexValueClass;
+ GraphJobMessage.VERTEX_CLASS = vertexClass;
+ GraphJobMessage.EDGE_VALUE_CLASS = edgeValueClass;
boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+ boolean runtimePartitioning = conf.getBoolean(
+ GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, false);
+ Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
+ .newInstance(
+ conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
+ conf);
if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
Combiner.class)) {
@@ -142,7 +155,7 @@ public final class GraphJobRunner<VERTEX
}
}
- loadVertices(peer, repairNeeded);
+ loadVertices(peer, repairNeeded, runtimePartitioning, partitioner);
numberVertices = vertices.size() * peer.getNumPeers();
// TODO refactor this to a single step
for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
@@ -178,7 +191,8 @@ public final class GraphJobRunner<VERTEX
// Map <vertexID, messages>
final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> messages = parseMessages(peer);
- // use iterations here, since repair can skew the number of supersteps
+ // use iterations here, since repair can skew the number of
+ // supersteps
if (isMasterTask(peer) && iteration > 1) {
MapWritable updatedCnt = new MapWritable();
// exit if there's no update made
@@ -194,7 +208,8 @@ public final class GraphJobRunner<VERTEX
if (intern.finalizeAggregation() != null) {
lastAggregatedValue = finalizeAggregation;
}
- // this count is usually the times of active vertices in the graph
+ // this count is usually the times of active
+ // vertices in the graph
updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
intern.getTimesAggregated());
}
@@ -277,7 +292,8 @@ public final class GraphJobRunner<VERTEX
GraphJobMessage msg = null;
final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
while ((msg = peer.getCurrentMessage()) != null) {
- // either this is a vertex message or a directive that must be read as map
+ // either this is a vertex message or a directive that must be read
+ // as map
if (msg.isVertexMessage()) {
final VERTEX_ID vertexID = (VERTEX_ID) msg.getVertexId();
final VERTEX_VALUE value = (VERTEX_VALUE) msg.getVertexValue();
@@ -318,15 +334,15 @@ public final class GraphJobRunner<VERTEX
@SuppressWarnings("unchecked")
private void loadVertices(
BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
- boolean repairNeeded) throws IOException {
- LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
+ boolean repairNeeded, boolean runtimePartitioning,
+ Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner) throws IOException,
+ SyncException, InterruptedException {
+ LOG.debug("vertex class: " + vertexClass);
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
KeyValuePair<? extends VertexWritable<VERTEX_ID, VERTEX_VALUE>, ? extends VertexArrayWritable> next = null;
while ((next = peer.readNext()) != null) {
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
- .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
- conf);
-
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+ vertexClass, conf);
vertex.setVertexID(next.getKey().getVertexId());
vertex.peer = peer;
vertex.runner = this;
@@ -343,15 +359,35 @@ public final class GraphJobRunner<VERTEX
}
List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> edges = new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>();
for (VertexWritable<VERTEX_ID, VERTEX_VALUE> e : arr) {
- String target = peer.getPeerName(Math.abs((e.hashCode() % peer
- .getAllPeerNames().length)));
+ int partition = partitioner.getPartition(e.getVertexId(),
+ e.getVertexValue(), peer.getNumPeers());
+ String target = peer.getPeerName(partition);
edges.add(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(e.getVertexId(), target,
(EDGE_VALUE_TYPE) e.getVertexValue()));
}
vertex.edges = edges;
- vertex.setup(conf);
- vertices.put(next.getKey().getVertexId(), vertex);
+ if (runtimePartitioning) {
+ int partition = partitioner.getPartition(vertex.getVertexID(),
+ vertex.getValue(), peer.getNumPeers());
+ peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
+ } else {
+ vertex.setup(conf);
+ vertices.put(next.getKey().getVertexId(), vertex);
+ }
+ }
+
+ if (runtimePartitioning) {
+ peer.sync();
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
+ .getVertex();
+ vertex.peer = peer;
+ vertex.runner = this;
+ vertex.setup(conf);
+ vertices.put(vertex.getVertexID(), vertex);
+ }
}
/*
@@ -372,25 +408,20 @@ public final class GraphJobRunner<VERTEX
new GraphJobMessage(e.getDestinationVertexID()));
}
}
- try {
- peer.sync();
- } catch (Exception e) {
- // we can't really recover from that, so fail this task
- throw new RuntimeException(e);
- }
+ peer.sync();
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
VERTEX_ID vertexName = (VERTEX_ID) msg.getVertexId();
if (!vertices.containsKey(vertexName)) {
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
- .newInstance(
- conf.getClass("hama.graph.vertex.class", Vertex.class), conf);
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
+ vertexClass, conf);
vertex.peer = peer;
vertex.setVertexID(vertexName);
vertex.runner = this;
if (selfReference) {
- String target = peer.getPeerName(Math.abs((vertex.hashCode() % peer
- .getAllPeerNames().length)));
+ int partition = partitioner.getPartition(vertex.getVertexID(),
+ vertex.getValue(), peer.getNumPeers());
+ String target = peer.getPeerName(partition);
vertex.edges = Collections
.singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
.getVertexID(), target, null));
@@ -406,7 +437,19 @@ public final class GraphJobRunner<VERTEX
}
/**
- * Just write <ID as Writable, Value as Writable> pair as a result
+ * @return a new vertex instance
+ */
+ public static <VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable> Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertexInstance(
+ Class<?> vertexClass, Configuration conf) {
+ @SuppressWarnings("unchecked")
+ Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
+ .newInstance(vertexClass, conf);
+ return vertex;
+ }
+
+ /**
+ * Just write <ID as Writable, Value as Writable> pair as a result. Note that
+ * this will also be executed when failure happened.
*/
@Override
public final void cleanup(
Modified: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1341306&r1=1341305&r2=1341306&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Tue May 22 05:25:49 2012
@@ -60,12 +60,11 @@ public class TestSubmitGraphJob extends
int vertexId = Integer.parseInt(adjacencyStringArray[0]);
String name = pages[vertexId];
@SuppressWarnings("unchecked")
- VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+ VertexWritable<Text, NullWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
for (int j = 1; j < adjacencyStringArray.length; j++) {
- arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
- new DoubleWritable(0.0d), new Text(
- pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
- DoubleWritable.class);
+ arr[j - 1] = new VertexWritable<Text, NullWritable>(NullWritable.get(),
+ new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
+ Text.class, NullWritable.class);
}
VertexArrayWritable wr = new VertexArrayWritable();
wr.set(arr);
@@ -113,8 +112,8 @@ public class TestSubmitGraphJob extends
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
verifyResult();
- LOG.info("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime)
+ / 1000.0 + " seconds");
} else {
fail();
}