You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/01/05 03:56:34 UTC
svn commit: r1227458 - in /incubator/hama/trunk:
core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Author: edwardyoon
Date: Thu Jan 5 02:56:34 2012
New Revision: 1227458
URL: http://svn.apache.org/viewvc?rev=1227458&view=rev
Log:
HAMA-476 Splitter doesn't work correctly
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1227458&r1=1227457&r2=1227458&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Jan 5 02:56:34 2012
@@ -86,7 +86,7 @@ public class BSPJobClient extends Config
JobProfile profile;
JobStatus status;
long statustime;
-
+
public NetworkedJob() {
}
@@ -305,14 +305,20 @@ public class BSPJobClient extends Config
fs.mkdirs(submitJobDir);
short replication = (short) job.getInt("bsp.submit.replication", 10);
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ int maxTasks = clusterStatus.getMaxTasks();
+ if (maxTasks < job.getNumBspTask()) {
+ job.setNumBspTask(maxTasks);
+ }
+
// only create the splits if we have an input
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
if (job.getConf().get("bsp.input.partitioner.class") != null) {
- job = partition(job);
+ job = partition(job, maxTasks);
}
- job.setNumBspTask(writeSplits(job, submitSplitFile));
+ job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -368,9 +374,12 @@ public class BSPJobClient extends Config
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- protected BSPJob partition(BSPJob job) throws IOException {
- InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
- int numOfTasks = splits.length; // job.getNumBspTask();
+ protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+ InputSplit[] splits = job.getInputFormat().getSplits(
+ job,
+ (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+ : maxTasks);
+
String input = job.getConf().get("bsp.input.dir");
if (input != null) {
@@ -396,7 +405,7 @@ public class BSPJobClient extends Config
RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- numOfTasks);
+ splits.length);
CompressionType compressionType = getOutputCompressionType(job);
Class<? extends CompressionCodec> outputCompressorClass = getOutputCompressorClass(
@@ -408,7 +417,7 @@ public class BSPJobClient extends Config
}
try {
- for (int i = 0; i < numOfTasks; i++) {
+ for (int i = 0; i < splits.length; i++) {
Path p = new Path(partitionedPath, getPartitionName(i));
if (codec == null) {
writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
@@ -429,7 +438,7 @@ public class BSPJobClient extends Config
Object value = recordReader.createValue();
while (recordReader.next(key, value)) {
int index = Math.abs(partitioner.getPartition(key, value,
- numOfTasks));
+ splits.length));
writers.get(index).append(key, value);
}
LOG.debug("Done with split " + i);
@@ -447,8 +456,12 @@ public class BSPJobClient extends Config
return job;
}
+ private boolean isProperSize(int numBspTask, int maxTasks) {
+ return (numBspTask > 1 && numBspTask < maxTasks);
+ }
+
private String getPartitionName(int i) {
- return "part-" + String.valueOf(100000 + i).substring(1, 6);
+ return "part-" + String.valueOf(100000 + i).substring(1, 6);
}
/**
@@ -492,8 +505,12 @@ public class BSPJobClient extends Config
return codecClass;
}
- private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException {
- InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
+ private int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
+ throws IOException {
+ InputSplit[] splits = job.getInputFormat().getSplits(
+ job,
+ (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+ : maxTasks);
final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
submitSplitFile, splits.length);
@@ -589,7 +606,7 @@ public class BSPJobClient extends Config
lastReport = report;
}
}
-
+
LOG.info("The total number of supersteps: " + info.getSuperstepCount());
// TODO job.getCounters().log(LOG);
return job.isSuccessful();
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1227458&r1=1227457&r2=1227458&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java Thu Jan 5 02:56:34 2012
@@ -199,7 +199,7 @@ public class ShortestPaths extends
}
public static void printUsage() {
- System.out.println("Usage: <startNode> <output path> <input path>");
+ System.out.println("Usage: <startNode> <output path> <input path> [numTasks]");
}
public static void main(String[] args) throws IOException,
@@ -221,6 +221,10 @@ public class ShortestPaths extends
bsp.setOutputPath(new Path(args[1]));
bsp.setInputPath(new Path(args[2]));
+ if(args.length == 4) {
+ bsp.setNumBspTask(Integer.parseInt(args[3]));
+ }
+
bsp.setBspClass(ShortestPaths.class);
bsp.setInputFormat(SequenceFileInputFormat.class);
bsp.setPartitioner(HashPartitioner.class);