You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:27:18 UTC

svn commit: r1077542 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 04:27:17 2011
New Revision: 1077542

URL: http://svn.apache.org/viewvc?rev=1077542&view=rev
Log:
commit 260b70d6f19f2f97b6cd91572b0961c5544f9397
Author: Mahadev Konar <ma...@cdev6022.inktomisearch.com>
Date:   Tue Jul 13 20:49:04 2010 +0000

    MAPREDUCE-1521. Protection against incorrectly configured reduces. From https://issues.apache.org/jira/secure/attachment/12449129/MAPREDUCE-1521-0.20-yahoo.patch (mahadev)
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1521. Protection against incorrectly configured reduces
    +    (mahadev)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 04:27:17 2011
@@ -365,6 +365,14 @@
 </property>
 
 <property>
+  <name>mapreduce.reduce.input.limit</name>
+  <value>-1</value>
+  <description>The limit on the input size of the reduce. If the estimated
+  input size of the reduce is greater than this value, job is failed. A
+  value of -1 means that there is no limit set. </description>
+</property>
+
+<property>
   <name>mapred.job.tracker.retiredjobs.cache.size</name>
   <value>1000</value>
   <description>The number of retired job status to keep in the cache.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 04:27:17 2011
@@ -397,6 +397,15 @@ public class JobClient extends Configure
     public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
       return jobSubmitClient.getTaskDiagnostics(id);
     }
+
+    @Override
+    public String getFailureInfo() throws IOException {
+      //assuming that this is just being called after 
+      //we realized the job failed. SO we try avoiding 
+      //a rpc by not calling updateStatus
+      ensureFreshStatus();
+      return status.getFailureInfo();
+    }
   }
 
   private JobSubmissionProtocol jobSubmitClient;
@@ -1192,6 +1201,7 @@ public class JobClient extends Configure
     RunningJob rj = jc.submitJob(job);
     try {
       if (!jc.monitorAndPrintJob(job, rj)) {
+        LOG.info("Job Failed: " + rj.getFailureInfo());
         throw new IOException("Job failed!");
       }
     } catch (InterruptedException ie) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:27:17 2011
@@ -112,7 +112,8 @@ public class JobInProgress {
   int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
-  
+  private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
+  long reduce_input_limit = -1L;
   private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   int completedMapsForReduceSlowstart = 0;
   
@@ -417,6 +418,12 @@ public class JobInProgress {
       this.jobMetrics.setTag("jobId", jobId.toString());
       hasSpeculativeMaps = conf.getMapSpeculativeExecution();
       hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+      // a limit on the input size of the reduce.
+      // we check to see if the estimated input size of 
+      // of each reduce is less than this value. If not
+      // we fail the job. A value of -1 just means there is no
+      // limit set.
+      reduce_input_limit = -1L;
       this.maxLevel = jobtracker.getNumTaskCacheLevels();
       this.anyCacheLevel = this.maxLevel+1;
       this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -425,7 +432,8 @@ public class JobInProgress {
       this.nonRunningReduces = new LinkedList<TaskInProgress>();    
       this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.resourceEstimator = new ResourceEstimator(this);
-
+      this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", 
+          DEFAULT_REDUCE_INPUT_LIMIT);
       // register job's tokens for renewal
       DelegationTokenRenewal.registerDelegationTokensForRenewal(
           jobInfo.getJobID(), ts, jobtracker.getConf());
@@ -1539,6 +1547,25 @@ public class JobInProgress {
       return null;
     }
     
+    /** check to see if we have any misbehaving reducers. If the expected output
+     * for reducers is huge then we just fail the job and error out. The estimated
+     * size is divided by 2 since the resource estimator returns the amount of disk 
+     * space the that the reduce will use (which is 2 times the input, space for merge + reduce
+     * input). **/
+    long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2;
+    if (((estimatedReduceInputSize) > 
+      reduce_input_limit) && (reduce_input_limit > 0L)) {
+      // make sure jobtracker lock is held
+      LOG.info("Exceeded limit for reduce input size: Estimated:" + 
+          estimatedReduceInputSize + " Limit: " + 
+          reduce_input_limit + " Failing Job " + jobId);
+      status.setFailureInfo("Job Exceeded Reduce Input limit " 
+          + " Limit:  " + reduce_input_limit + 
+          " Estimated: " + estimatedReduceInputSize);
+      jobtracker.failJob(this);
+      return null;
+    }
+    
     // Ensure we have sufficient map outputs ready to shuffle before 
     // scheduling reduces
     if (!scheduleReduces()) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Mar  4 04:27:17 2011
@@ -84,7 +84,8 @@ public class JobStatus implements Writab
   private String user;
   private JobPriority priority;
   private String schedulingInfo="NA";
-    
+  private String failureInfo = "NA";
+  
   /**
    */
   public JobStatus() {
@@ -278,8 +279,24 @@ public class JobStatus implements Writab
   public synchronized String getSchedulingInfo() {
    return schedulingInfo;
   }
+  
+  /**
+   * gets any available info on the reason of failure of the job.
+   * @return diagnostic information on why a job might have failed.
+   */
+  public synchronized String getFailureInfo() {
+    return this.failureInfo;
+  }
 
   /**
+   * set the reason for failuire of this job
+   * @param failureInfo the reason for failure of this job.
+   */
+  public synchronized void setFailureInfo(String failureInfo) {
+    this.failureInfo = failureInfo;
+  }
+  
+  /**
    * Used to set the scheduling information associated to a particular Job.
    * 
    * @param schedulingInfo Scheduling information of the job
@@ -343,6 +360,7 @@ public class JobStatus implements Writab
       WritableUtils.writeEnum(out, entry.getKey());
       entry.getValue().write(out);
     }
+    Text.writeString(out, failureInfo);
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
@@ -365,6 +383,7 @@ public class JobStatus implements Writab
       acl.readFields(in);
       this.jobACLs.put(aclType, acl);
     }
+    this.failureInfo = Text.readString(in);
   }
 
   // A utility to convert new job runstates to the old ones.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar  4 04:27:17 2011
@@ -80,8 +80,10 @@ interface JobSubmissionProtocol extends 
    * Version 26: Added the method getQueueAdmins(queueName) as part of
    *             MAPREDUCE-1664.
    * Version 27: Added queue state to JobQueueInfo as part of HADOOP-5913.
+   * Version 28: Added a new field to JobStatus to provide user readable 
+   *             information on job failure. MAPREDUCE-1521.
    */
-  public static final long versionID = 27L;
+  public static final long versionID = 28L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:27:17 2011
@@ -2049,7 +2049,6 @@ public class JobTracker implements MRCon
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
-    
     NUM_HEARTBEATS_IN_SECOND = 
       conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
     if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
@@ -3947,9 +3946,11 @@ public class JobTracker implements MRCon
           StringUtils.stringifyException(kie));
       killJob(job);
     } catch (Throwable t) {
+      String failureInfo = "Job initialization failed:\n" +
+      StringUtils.stringifyException(t);
       // If the job initialization is failed, job state will be FAILED
-      LOG.error("Job initialization failed:\n" +
-          StringUtils.stringifyException(t));
+      LOG.error(failureInfo);
+      job.getStatus().setFailureInfo(failureInfo);
       failJob(job);
     }
 	 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Mar  4 04:27:17 2011
@@ -183,6 +183,13 @@ public interface RunningJob {
   public Counters getCounters() throws IOException;
   
   /**
+   * Get failure info for the job.
+   * @return the failure info for the job.
+   * @throws IOException
+   */
+  public String getFailureInfo() throws IOException;
+  
+  /**
    * Gets the diagnostic messages for a given task attempt.
    * @param taskid
    * @return the list of diagnostic messages for the task

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Fri Mar  4 04:27:17 2011
@@ -63,6 +63,44 @@ public class TestTaskLimits extends Test
   }
 
   /**
+   * check with a reduce limit of 10 bytes for input to reduce.
+   * This should fail since input to reduce estimate is greater
+   * than that!
+   * @return true on failing the job, false
+   * @throws IOException
+   */
+  private boolean runReduceLimitCheck() throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    boolean success = false;
+    try {
+      final int taskTrackers = 2;
+   
+      Configuration conf = new Configuration();
+      conf.setInt("mapred.jobtracker.maxtasks.per.job", -1);
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      JobConf jconf = new JobConf(conf);
+      mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1,
+                             null, null, null, jconf);
+      
+      JobConf jc = mr.createJobConf();
+      jc.setLong("mapreduce.reduce.input.limit", 10L);
+      try {
+        runPI(mr, jc);
+        success = false;
+      } catch (IOException e) {
+        success = true;
+      }
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+    return success;
+  }
+  
+  /**
    * Run the pi test with a specifix value of 
    * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
    */
@@ -73,7 +111,7 @@ public class TestTaskLimits extends Test
     boolean success = false;
     try {
       final int taskTrackers = 2;
-
+   
       Configuration conf = new Configuration();
       conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
       dfs = new MiniDFSCluster(conf, 4, true, null);
@@ -114,5 +152,8 @@ public class TestTaskLimits extends Test
     status = runOneTest(-1);
     assertTrue(status == true);
     System.out.println("Job 3 succeeded as expected.");
+    status = runReduceLimitCheck();
+    assertTrue(status == true);
+    System.out.println("Success: Reduce limit as expected");
   }
 }