You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/02/06 05:21:54 UTC
svn commit: r1442839 -
/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Author: edwardyoon
Date: Wed Feb 6 04:21:54 2013
New Revision: 1442839
URL: http://svn.apache.org/viewvc?rev=1442839&view=rev
Log:
HAMA-731: Partitioning error in BSPJobClient
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1442839&r1=1442838&r2=1442839&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Feb 6 04:21:54 2013
@@ -370,7 +370,18 @@ public class BSPJobClient extends Config
}
protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+ String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
+ Path inputDir = new Path(inputPath);
+ if (fs.isFile(inputDir)) {
+ inputDir = inputDir.getParent();
+ }
+
+ Path partitionDir = new Path(inputDir + "/partitions");
+ if (fs.exists(partitionDir)) {
+ fs.delete(partitionDir, true);
+ }
+
if (job.get("bsp.partitioning.runner.job") != null) {
return job;
}// Early exit for the partitioner job.
@@ -380,12 +391,6 @@ public class BSPJobClient extends Config
(isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
: maxTasks);
- String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
- Path inputDir = new Path(inputPath);
- if (fs.isFile(inputDir)) {
- inputDir = inputDir.getParent();
- }
-
if (inputPath != null) {
int numSplits = splits.length;
int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
@@ -406,12 +411,6 @@ public class BSPJobClient extends Config
Constants.ENABLE_RUNTIME_PARTITIONING, false) && job
.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null)) {
- Path partitionDir = new Path(inputDir + "/partitions");
-
- if (fs.exists(partitionDir)) {
- fs.delete(partitionDir, true);
- }
-
if (numTasks == 0) {
numTasks = numSplits;
}