You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:30:45 UTC
svn commit: r1077575 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
mapred/org/apache/hadoop/mapred/JobHistory.java
mapred/org/apache/hadoop/mapred/JobInProgress.java
Author: omalley
Date: Fri Mar 4 04:30:45 2011
New Revision: 1077575
URL: http://svn.apache.org/viewvc?rev=1077575&view=rev
Log:
commit 2eacb066e6fb867db147ea2d488a2b78d9a7817e
Author: Chris Douglas <cd...@apache.org>
Date: Tue Jul 20 23:53:23 2010 -0700
MAPREDUCE:339 from https://issues.apache.org/jira/secure/attachment/12450028/M339-0y20s.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-339. Greedily schedule failed tasks to cause early job failure.
+ (cdouglas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java Fri Mar 4 04:30:45 2011
@@ -38,7 +38,6 @@ public interface RefreshUserMappingsProt
/**
* Refresh user to group mappings.
- * @param conf
* @throws IOException
*/
public void refreshUserToGroupsMappings() throws IOException;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar 4 04:30:45 2011
@@ -486,8 +486,6 @@ public class JobHistory {
* @param conf Jobconf of the job tracker.
* @param hostname jobtracker's hostname
* @param jobTrackerStartTime jobtracker's start time
- * @return true if intialized properly
- * false otherwise
*/
public static void init(JobTracker jobTracker, JobConf conf,
String hostname, long jobTrackerStartTime) throws IOException {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:30:45 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Vector;
@@ -79,8 +81,8 @@ public class JobInProgress {
/**
* Used when the a kill is issued to a job which is initializing.
*/
+ @SuppressWarnings("serial")
static class KillInterruptedException extends InterruptedException {
- private static final long serialVersionUID = 1L;
public KillInterruptedException(String msg) {
super(msg);
}
@@ -122,8 +124,8 @@ public class JobInProgress {
int speculativeMapTasks = 0;
int speculativeReduceTasks = 0;
- int mapFailuresPercent = 0;
- int reduceFailuresPercent = 0;
+ final int mapFailuresPercent;
+ final int reduceFailuresPercent;
int failedMapTIPs = 0;
int failedReduceTIPs = 0;
private volatile boolean launchedCleanup = false;
@@ -142,14 +144,17 @@ public class JobInProgress {
// Map of NetworkTopology Node to set of running TIPs
Map<Node, Set<TaskInProgress>> runningMapCache;
- // A list of non-local non-running maps
- List<TaskInProgress> nonLocalMaps;
+ // A list of non-local, non-running maps
+ final List<TaskInProgress> nonLocalMaps;
+
+ // Set of failed, non-running maps sorted by #failures
+ final SortedSet<TaskInProgress> failedMaps;
// A set of non-local running maps
Set<TaskInProgress> nonLocalRunningMaps;
// A list of non-running reduce TIPs
- List<TaskInProgress> nonRunningReduces;
+ Set<TaskInProgress> nonRunningReduces;
// A set of running reduce TIPs
Set<TaskInProgress> runningReduces;
@@ -160,6 +165,16 @@ public class JobInProgress {
// A list of cleanup tasks for the reduce task attempts, to be launched
List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
+ // keep failedMaps, nonRunningReduces ordered by failure count to bias
+ // scheduling toward failing tasks
+ private static final Comparator<TaskInProgress> failComparator =
+ new Comparator<TaskInProgress>() {
+ @Override
+ public int compare(TaskInProgress t1, TaskInProgress t2) {
+ return t2.numTaskFailures() - t1.numTaskFailures();
+ }
+ };
+
private final int maxLevel;
/**
@@ -223,7 +238,7 @@ public class JobInProgress {
private final int restartCount;
private JobConf conf;
- AtomicBoolean tasksInited = new AtomicBoolean(false);
+ volatile boolean tasksInited = false;
private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
private LocalFileSystem localFs;
@@ -315,9 +330,10 @@ public class JobInProgress {
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
this.nonLocalMaps = new LinkedList<TaskInProgress>();
+ this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
+ this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
@@ -327,6 +343,8 @@ public class JobInProgress {
this.memoryPerMap = conf.getMemoryForMapTask();
this.memoryPerReduce = conf.getMemoryForReduceTask();
this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+ this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
@@ -427,9 +445,10 @@ public class JobInProgress {
this.maxLevel = jobtracker.getNumTaskCacheLevels();
this.anyCacheLevel = this.maxLevel+1;
this.nonLocalMaps = new LinkedList<TaskInProgress>();
+ this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
+ this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit",
@@ -541,7 +560,7 @@ public class JobInProgress {
* <code>false</code> otherwise
*/
public boolean inited() {
- return tasksInited.get();
+ return tasksInited;
}
/**
@@ -612,7 +631,7 @@ public class JobInProgress {
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException {
- if (tasksInited.get() || isComplete()) {
+ if (tasksInited || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
@@ -740,7 +759,7 @@ public class JobInProgress {
}
}
- tasksInited.set(true);
+ tasksInited = true;
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
@@ -748,7 +767,7 @@ public class JobInProgress {
LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
+ " map tasks and " + numReduceTasks + " reduce tasks.");
}
-
+
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
throws IOException {
TaskSplitMetaInfo[] allTaskSplitMetaInfo =
@@ -1267,7 +1286,7 @@ public class JobInProgress {
public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
boolean isMapSlot)
throws IOException {
- if (!tasksInited.get()) {
+ if (!tasksInited) {
return null;
}
synchronized (this) {
@@ -1303,7 +1322,7 @@ public class JobInProgress {
int clusterSize,
int numUniqueHosts)
throws IOException {
- if (!tasksInited.get()) {
+ if (!tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobID());
try { throw new IOException("state = " + status.getRunState()); }
catch (IOException ioe) {ioe.printStackTrace();}
@@ -1329,7 +1348,7 @@ public class JobInProgress {
int clusterSize,
int numUniqueHosts)
throws IOException {
- if (!tasksInited.get()) {
+ if (!tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobID());
try { throw new IOException("state = " + status.getRunState()); }
catch (IOException ioe) {ioe.printStackTrace();}
@@ -1370,13 +1389,12 @@ public class JobInProgress {
/**
* Check if we can schedule an off-switch task for this job.
+ * @param numTaskTrackers TaskTrackers in the cluster.
* @param numTaskTrackers number of tasktrackers
- *
- * We check the number of missed opportunities for the job.
- * If it has 'waited' long enough we go ahead and schedule.
- *
* @return <code>true</code> if we can schedule off-switch,
* <code>false</code> otherwise
+ * We check the number of missed opportunities for the job.
+ * If it has 'waited' long enough we go ahead and schedule.
*/
public boolean scheduleOffSwitch(int numTaskTrackers) {
long missedTaskTrackers = getNumSchedulingOpportunities();
@@ -1395,7 +1413,7 @@ public class JobInProgress {
int numUniqueHosts,
boolean isMapSlot
) throws IOException {
- if(!tasksInited.get()) {
+ if(!tasksInited) {
return null;
}
@@ -1487,7 +1505,7 @@ public class JobInProgress {
int numUniqueHosts,
boolean isMapSlot
) throws IOException {
- if(!tasksInited.get()) {
+ if(!tasksInited) {
return null;
}
@@ -1537,7 +1555,7 @@ public class JobInProgress {
* @return true/false
*/
private synchronized boolean canLaunchSetupTask() {
- return (tasksInited.get() && status.getRunState() == JobStatus.PREP &&
+ return (tasksInited && status.getRunState() == JobStatus.PREP &&
!launchedSetup && !jobKilled && !jobFailed);
}
@@ -2002,37 +2020,14 @@ public class JobInProgress {
* @param tip the tip that needs to be failed
*/
private synchronized void failMap(TaskInProgress tip) {
- if (nonRunningMapCache == null) {
- LOG.warn("Non-running cache for maps missing!! "
- + "Job details are missing.");
- return;
- }
-
- // 1. Its added everywhere since other nodes (having this split local)
- // might have removed this tip from their local cache
- // 2. Give high priority to failed tip - fail early
-
- String[] splitLocations = tip.getSplitLocations();
-
- // Add the TIP in the front of the list for non-local non-running maps
- if (splitLocations == null || splitLocations.length == 0) {
- nonLocalMaps.add(0, tip);
+ if (failedMaps == null) {
+ LOG.warn("Failed cache for maps is missing! Job details are missing.");
return;
}
- for(String host: splitLocations) {
- Node node = jobtracker.getNode(host);
-
- for (int j = 0; j < maxLevel; ++j) {
- List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
- if (hostMaps == null) {
- hostMaps = new LinkedList<TaskInProgress>();
- nonRunningMapCache.put(node, hostMaps);
- }
- hostMaps.add(0, tip);
- node = node.getParent();
- }
- }
+ // Ignore locality for subsequent scheduling on this TIP. Always schedule
+ // it ahead of other tasks.
+ failedMaps.add(tip);
}
/**
@@ -2045,7 +2040,7 @@ public class JobInProgress {
+ "Job details are missing.");
return;
}
- nonRunningReduces.add(0, tip);
+ nonRunningReduces.add(tip);
}
/**
@@ -2189,24 +2184,33 @@ public class JobInProgress {
}
- // For scheduling a map task, we have two caches and a list (optional)
- // I) one for non-running task
- // II) one for running task (this is for handling speculation)
- // III) a list of TIPs that have empty locations (e.g., dummy splits),
- // the list is empty if all TIPs have associated locations
+ // When scheduling a map task:
+ // 0) Schedule a failed task without considering locality
+ // 1) Schedule non-running tasks
+ // 2) Schedule speculative tasks
+ // 3) Schedule tasks with no location information
// First a look up is done on the non-running cache and on a miss, a look
// up is done on the running cache. The order for lookup within the cache:
// 1. from local node to root [bottom up]
// 2. breadth wise for all the parent nodes at max level
-
- // We fall to linear scan of the list (III above) if we have misses in the
+ // We fall to linear scan of the list ((3) above) if we have misses in the
// above caches
+ // 0) Schedule the task with the most failures, unless failure was on this
+ // machine
+ tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
+ if (tip != null) {
+ // Add to the running list
+ scheduleMap(tip);
+ LOG.info("Choosing a failed task " + tip.getTIPId());
+ return tip.getIdWithinJob();
+ }
+
Node node = jobtracker.getNode(tts.getHost());
//
- // I) Non-running TIP :
+ // 1) Non-running TIP :
//
// 1. check from local node to the root [bottom up cache lookup]
@@ -2216,10 +2220,10 @@ public class JobInProgress {
Node key = node;
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
- // called to schedule any task (local, rack-local, off-switch or speculative)
- // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
- // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
- // tasks
+ // called to schedule any task (local, rack-local, off-switch or
+ // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
+ // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
+ // off-switch/speculative tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
@@ -2295,7 +2299,7 @@ public class JobInProgress {
}
//
- // II) Running TIP :
+ // 2) Running TIP :
//
if (hasSpeculativeMaps) {
@@ -2702,7 +2706,7 @@ public class JobInProgress {
* @param jobTerminationState job termination state
*/
private synchronized void terminate(int jobTerminationState) {
- if(!tasksInited.get()) {
+ if(!tasksInited) {
//init could not be done, we just terminate directly.
terminateJob(jobTerminationState);
return;
@@ -3088,6 +3092,7 @@ public class JobInProgress {
cleanUpMetrics();
// free up the memory used by the data structures
+ this.failedMaps.clear();
this.nonRunningMapCache = null;
this.runningMapCache = null;
this.nonRunningReduces = null;
@@ -3184,9 +3189,7 @@ public class JobInProgress {
float failureRate = (float)fetchFailures / runningReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
- boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
- ? true
- : false;
+ boolean isMapFaulty = failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT;
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
&& isMapFaulty) {
LOG.info("Too many fetch-failures for output of task: " + mapTaskId