You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:27:18 UTC
svn commit: r1077542 - in
/hadoop/common/branches/branch-0.20-security-patches/src: mapred/
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:27:17 2011
New Revision: 1077542
URL: http://svn.apache.org/viewvc?rev=1077542&view=rev
Log:
commit 260b70d6f19f2f97b6cd91572b0961c5544f9397
Author: Mahadev Konar <ma...@cdev6022.inktomisearch.com>
Date: Tue Jul 13 20:49:04 2010 +0000
MAPREDUCE-1521. Protection against incorrectly configured reduces. From https://issues.apache.org/jira/secure/attachment/12449129/MAPREDUCE-1521-0.20-yahoo.patch (mahadev)
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1521. Protection against incorrectly configured reduces
+ (mahadev)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 04:27:17 2011
@@ -365,6 +365,14 @@
</property>
<property>
+ <name>mapreduce.reduce.input.limit</name>
+ <value>-1</value>
+ <description>The limit on the input size of the reduce. If the estimated
+ input size of the reduce is greater than this value, job is failed. A
+ value of -1 means that there is no limit set. </description>
+</property>
+
+<property>
<name>mapred.job.tracker.retiredjobs.cache.size</name>
<value>1000</value>
<description>The number of retired job status to keep in the cache.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 04:27:17 2011
@@ -397,6 +397,15 @@ public class JobClient extends Configure
public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
return jobSubmitClient.getTaskDiagnostics(id);
}
+
+ @Override
+ public String getFailureInfo() throws IOException {
+ //assuming that this is just being called after
+ //we realized the job failed. SO we try avoiding
+ //a rpc by not calling updateStatus
+ ensureFreshStatus();
+ return status.getFailureInfo();
+ }
}
private JobSubmissionProtocol jobSubmitClient;
@@ -1192,6 +1201,7 @@ public class JobClient extends Configure
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
+ LOG.info("Job Failed: " + rj.getFailureInfo());
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:27:17 2011
@@ -112,7 +112,8 @@ public class JobInProgress {
int finishedReduceTasks = 0;
int failedMapTasks = 0;
int failedReduceTasks = 0;
-
+ private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
+ long reduce_input_limit = -1L;
private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
int completedMapsForReduceSlowstart = 0;
@@ -417,6 +418,12 @@ public class JobInProgress {
this.jobMetrics.setTag("jobId", jobId.toString());
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+ // a limit on the input size of the reduce.
+ // we check to see if the estimated input size of
+ // of each reduce is less than this value. If not
+ // we fail the job. A value of -1 just means there is no
+ // limit set.
+ reduce_input_limit = -1L;
this.maxLevel = jobtracker.getNumTaskCacheLevels();
this.anyCacheLevel = this.maxLevel+1;
this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -425,7 +432,8 @@ public class JobInProgress {
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
-
+ this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit",
+ DEFAULT_REDUCE_INPUT_LIMIT);
// register job's tokens for renewal
DelegationTokenRenewal.registerDelegationTokensForRenewal(
jobInfo.getJobID(), ts, jobtracker.getConf());
@@ -1539,6 +1547,25 @@ public class JobInProgress {
return null;
}
+ /** check to see if we have any misbehaving reducers. If the expected output
+ * for reducers is huge then we just fail the job and error out. The estimated
+ * size is divided by 2 since the resource estimator returns the amount of disk
+ * space the that the reduce will use (which is 2 times the input, space for merge + reduce
+ * input). **/
+ long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2;
+ if (((estimatedReduceInputSize) >
+ reduce_input_limit) && (reduce_input_limit > 0L)) {
+ // make sure jobtracker lock is held
+ LOG.info("Exceeded limit for reduce input size: Estimated:" +
+ estimatedReduceInputSize + " Limit: " +
+ reduce_input_limit + " Failing Job " + jobId);
+ status.setFailureInfo("Job Exceeded Reduce Input limit "
+ + " Limit: " + reduce_input_limit +
+ " Estimated: " + estimatedReduceInputSize);
+ jobtracker.failJob(this);
+ return null;
+ }
+
// Ensure we have sufficient map outputs ready to shuffle before
// scheduling reduces
if (!scheduleReduces()) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Mar 4 04:27:17 2011
@@ -84,7 +84,8 @@ public class JobStatus implements Writab
private String user;
private JobPriority priority;
private String schedulingInfo="NA";
-
+ private String failureInfo = "NA";
+
/**
*/
public JobStatus() {
@@ -278,8 +279,24 @@ public class JobStatus implements Writab
public synchronized String getSchedulingInfo() {
return schedulingInfo;
}
+
+ /**
+ * gets any available info on the reason of failure of the job.
+ * @return diagnostic information on why a job might have failed.
+ */
+ public synchronized String getFailureInfo() {
+ return this.failureInfo;
+ }
/**
+ * set the reason for failuire of this job
+ * @param failureInfo the reason for failure of this job.
+ */
+ public synchronized void setFailureInfo(String failureInfo) {
+ this.failureInfo = failureInfo;
+ }
+
+ /**
* Used to set the scheduling information associated to a particular Job.
*
* @param schedulingInfo Scheduling information of the job
@@ -343,6 +360,7 @@ public class JobStatus implements Writab
WritableUtils.writeEnum(out, entry.getKey());
entry.getValue().write(out);
}
+ Text.writeString(out, failureInfo);
}
public synchronized void readFields(DataInput in) throws IOException {
@@ -365,6 +383,7 @@ public class JobStatus implements Writab
acl.readFields(in);
this.jobACLs.put(aclType, acl);
}
+ this.failureInfo = Text.readString(in);
}
// A utility to convert new job runstates to the old ones.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar 4 04:27:17 2011
@@ -80,8 +80,10 @@ interface JobSubmissionProtocol extends
* Version 26: Added the method getQueueAdmins(queueName) as part of
* MAPREDUCE-1664.
* Version 27: Added queue state to JobQueueInfo as part of HADOOP-5913.
+ * Version 28: Added a new field to JobStatus to provide user readable
+ * information on job failure. MAPREDUCE-1521.
*/
- public static final long versionID = 27L;
+ public static final long versionID = 28L;
/**
* Allocate a name for the job.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:27:17 2011
@@ -2049,7 +2049,6 @@ public class JobTracker implements MRCon
MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
MAX_BLACKLISTS_PER_TRACKER =
conf.getInt("mapred.max.tracker.blacklists", 4);
-
NUM_HEARTBEATS_IN_SECOND =
conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
@@ -3947,9 +3946,11 @@ public class JobTracker implements MRCon
StringUtils.stringifyException(kie));
killJob(job);
} catch (Throwable t) {
+ String failureInfo = "Job initialization failed:\n" +
+ StringUtils.stringifyException(t);
// If the job initialization is failed, job state will be FAILED
- LOG.error("Job initialization failed:\n" +
- StringUtils.stringifyException(t));
+ LOG.error(failureInfo);
+ job.getStatus().setFailureInfo(failureInfo);
failJob(job);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Mar 4 04:27:17 2011
@@ -183,6 +183,13 @@ public interface RunningJob {
public Counters getCounters() throws IOException;
/**
+ * Get failure info for the job.
+ * @return the failure info for the job.
+ * @throws IOException
+ */
+ public String getFailureInfo() throws IOException;
+
+ /**
* Gets the diagnostic messages for a given task attempt.
* @param taskid
* @return the list of diagnostic messages for the task
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=1077542&r1=1077541&r2=1077542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Fri Mar 4 04:27:17 2011
@@ -63,6 +63,44 @@ public class TestTaskLimits extends Test
}
/**
+ * check with a reduce limit of 10 bytes for input to reduce.
+ * This should fail since input to reduce estimate is greater
+ * than that!
+ * @return true on failing the job, false
+ * @throws IOException
+ */
+ private boolean runReduceLimitCheck() throws IOException {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ boolean success = false;
+ try {
+ final int taskTrackers = 2;
+
+ Configuration conf = new Configuration();
+ conf.setInt("mapred.jobtracker.maxtasks.per.job", -1);
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+ fileSys = dfs.getFileSystem();
+ JobConf jconf = new JobConf(conf);
+ mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1,
+ null, null, null, jconf);
+
+ JobConf jc = mr.createJobConf();
+ jc.setLong("mapreduce.reduce.input.limit", 10L);
+ try {
+ runPI(mr, jc);
+ success = false;
+ } catch (IOException e) {
+ success = true;
+ }
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
+ }
+ return success;
+ }
+
+ /**
* Run the pi test with a specifix value of
* mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
*/
@@ -73,7 +111,7 @@ public class TestTaskLimits extends Test
boolean success = false;
try {
final int taskTrackers = 2;
-
+
Configuration conf = new Configuration();
conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
dfs = new MiniDFSCluster(conf, 4, true, null);
@@ -114,5 +152,8 @@ public class TestTaskLimits extends Test
status = runOneTest(-1);
assertTrue(status == true);
System.out.println("Job 3 succeeded as expected.");
+ status = runReduceLimitCheck();
+ assertTrue(status == true);
+ System.out.println("Success: Reduce limit as expected");
}
}