You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/03 02:44:13 UTC
svn commit: r1563754 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Author: edwardyoon
Date: Mon Feb 3 01:44:13 2014
New Revision: 1563754
URL: http://svn.apache.org/r1563754
Log:
HAMA-862: Handling max tasks exception (edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1563754&r1=1563753&r2=1563754&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Feb 3 01:44:13 2014
@@ -14,7 +14,8 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
- HAMA-860: Make aggregators start from the first superstep (Anastasis Andronidis)
+ HAMA-862: Handling max tasks exception (edwardyoon)
+ HAMA-860: Make aggregators start from the first superstep (Anastasis Andronidis)
HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
HAMA-845: The size() of Spilling Queue returns always numMessagesWritten (edwardyoon)
HAMA-834: Fix KMeans example (Martin Illecker)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1563754&r1=1563753&r2=1563754&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Feb 3 01:44:13 2014
@@ -302,9 +302,16 @@ public class BSPJobClient extends Config
throws IOException {
BSPJob job = pJob;
job.setJobID(jobId);
- int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
- 0);
- int maxTasks = checkTaskLimits(job, limitTasks);
+
+ ClusterStatus clusterStatus = getClusterStatus(true);
+ int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+
+ if (maxTasks < job.getNumBspTask()) {
+ LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
+ + maxTasks + " tasks.");
+ job.setNumBspTask(maxTasks);
+ }
Path submitJobDir = new Path(getSystemDir(), "submit_"
+ Integer.toString(Math.abs(r.nextInt()), 36));
@@ -329,20 +336,21 @@ public class BSPJobClient extends Config
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- InputSplit[] splits = job.getInputFormat().getSplits(
- job,
- (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
- : maxTasks);
+ InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
job = partition(job, splits, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
if (job.getBoolean("input.has.partitioned", false)) {
- splits = job.getInputFormat().getSplits(
- job,
- (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
- : maxTasks);
+ splits = job.getInputFormat().getSplits(job, maxTasks);
}
+
+ if (maxTasks < splits.length) {
+ throw new IOException(
+ "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
+ + splits.length + ", The number of max tasks: " + maxTasks);
+ }
+
job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -472,25 +480,6 @@ public class BSPJobClient extends Config
return job;
}
- protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
- int maxTasks;
- ClusterStatus clusterStatus = getClusterStatus(true);
-
- if (limitTasks > 0) {
- maxTasks = limitTasks;
- } else {
- maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
- }
-
- if (maxTasks < job.getNumBspTask()) {
- throw new IOException(
- "Job failed! The number of tasks has exceeded the maximum allowed. Maxtasks: "
- + maxTasks + " < configured number of tasks: "
- + job.getNumBspTask());
- }
- return maxTasks;
- }
-
protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
Path submitJobFile, FileSystem fs) throws IOException {
//
@@ -505,10 +494,6 @@ public class BSPJobClient extends Config
}
}
- private static boolean isProperSize(int numBspTask, int maxTasks) {
- return (numBspTask > 1 && numBspTask < maxTasks);
- }
-
/**
* Get the {@link CompressionType} for the output {@link SequenceFile}.
*