You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/03 02:44:13 UTC

svn commit: r1563754 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPJobClient.java

Author: edwardyoon
Date: Mon Feb  3 01:44:13 2014
New Revision: 1563754

URL: http://svn.apache.org/r1563754
Log:
HAMA-862: Handling max tasks exception (edwardyoon)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1563754&r1=1563753&r2=1563754&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Feb  3 01:44:13 2014
@@ -14,7 +14,8 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
-	 HAMA-860: Make aggregators start from the first superstep (Anastasis Andronidis)
+   HAMA-862: Handling max tasks exception (edwardyoon)
+   HAMA-860: Make aggregators start from the first superstep (Anastasis Andronidis)
    HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
    HAMA-845: The size() of Spilling Queue returns always numMessagesWritten (edwardyoon)
    HAMA-834: Fix KMeans example (Martin Illecker)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1563754&r1=1563753&r2=1563754&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Feb  3 01:44:13 2014
@@ -302,9 +302,16 @@ public class BSPJobClient extends Config
       throws IOException {
     BSPJob job = pJob;
     job.setJobID(jobId);
-    int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
-        0);
-    int maxTasks = checkTaskLimits(job, limitTasks);
+
+    ClusterStatus clusterStatus = getClusterStatus(true);
+    int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+        clusterStatus.getMaxTasks() - clusterStatus.getTasks());
+
+    if (maxTasks < job.getNumBspTask()) {
+      LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will run with "
+          + maxTasks + " tasks.");
+      job.setNumBspTask(maxTasks);
+    }
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
         + Integer.toString(Math.abs(r.nextInt()), 36));
@@ -329,20 +336,21 @@ public class BSPJobClient extends Config
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
 
-      InputSplit[] splits = job.getInputFormat().getSplits(
-          job,
-          (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
-              : maxTasks);
+      InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
 
       job = partition(job, splits, maxTasks);
       maxTasks = job.getInt("hama.partition.count", maxTasks);
 
       if (job.getBoolean("input.has.partitioned", false)) {
-        splits = job.getInputFormat().getSplits(
-            job,
-            (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
-                : maxTasks);
+        splits = job.getInputFormat().getSplits(job, maxTasks);
       }
+
+      if (maxTasks < splits.length) {
+        throw new IOException(
+            "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
+                + splits.length + ", The number of max tasks: " + maxTasks);
+      }
+
       job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
@@ -472,25 +480,6 @@ public class BSPJobClient extends Config
     return job;
   }
 
-  protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
-    int maxTasks;
-    ClusterStatus clusterStatus = getClusterStatus(true);
-
-    if (limitTasks > 0) {
-      maxTasks = limitTasks;
-    } else {
-      maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
-    }
-
-    if (maxTasks < job.getNumBspTask()) {
-      throw new IOException(
-          "Job failed! The number of tasks has exceeded the maximum allowed. Maxtasks: "
-              + maxTasks + " < configured number of tasks: "
-              + job.getNumBspTask());
-    }
-    return maxTasks;
-  }
-
   protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
       Path submitJobFile, FileSystem fs) throws IOException {
     //
@@ -505,10 +494,6 @@ public class BSPJobClient extends Config
     }
   }
 
-  private static boolean isProperSize(int numBspTask, int maxTasks) {
-    return (numBspTask > 1 && numBspTask < maxTasks);
-  }
-
   /**
    * Get the {@link CompressionType} for the output {@link SequenceFile}.
    *