You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:11 UTC
svn commit: r1076949 [2/3] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/src/java/org/apa...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:25:10 2011
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -45,6 +46,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -52,6 +55,8 @@ import org.apache.hadoop.util.StringUtil
* and its latest JobStatus, plus a set of tables for
* doing bookkeeping of its Tasks.
* ***********************************************************
+ *
+ * This is NOT a public interface!
*/
class JobInProgress {
/**
@@ -77,6 +82,8 @@ class JobInProgress {
TaskInProgress setup[] = new TaskInProgress[0];
int numMapTasks = 0;
int numReduceTasks = 0;
+ int numSlotsPerMap = 1;
+ int numSlotsPerReduce = 1;
// Counters to track currently running/finished/failed Map/Reduce task-attempts
int runningMapTasks = 0;
@@ -194,7 +201,9 @@ class JobInProgress {
TOTAL_LAUNCHED_REDUCES,
OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
- RACK_LOCAL_MAPS
+ RACK_LOCAL_MAPS,
+ FALLOW_SLOTS_MILLIS_MAPS,
+ FALLOW_SLOTS_MILLIS_REDUCES
}
private Counters jobCounters = new Counters();
@@ -210,6 +219,36 @@ class JobInProgress {
private Object schedulingInfo;
+ private static class FallowSlotInfo {
+ long timestamp;
+ int numSlots;
+
+ public FallowSlotInfo(long timestamp, int numSlots) {
+ this.timestamp = timestamp;
+ this.numSlots = numSlots;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public int getNumSlots() {
+ return numSlots;
+ }
+
+ public void setNumSlots(int numSlots) {
+ this.numSlots = numSlots;
+ }
+ }
+
+ private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps =
+ new HashMap<TaskTracker, FallowSlotInfo>();
+ private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces =
+ new HashMap<TaskTracker, FallowSlotInfo>();
/**
* Create an almost empty JobInProgress, which can be used only for tests
@@ -391,6 +430,41 @@ class JobInProgress {
}
/**
+ * Get the number of slots required to run a single map task-attempt.
+ * @return the number of slots required to run a single map task-attempt
+ */
+ synchronized int getNumSlotsPerMap() {
+ return numSlotsPerMap;
+ }
+
+ /**
+ * Set the number of slots required to run a single map task-attempt.
+ * This is typically set by schedulers which support high-ram jobs.
+ * @param slots the number of slots required to run a single map task-attempt
+ */
+ synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+ this.numSlotsPerMap = numSlotsPerMap;
+ }
+
+ /**
+ * Get the number of slots required to run a single reduce task-attempt.
+ * @return the number of slots required to run a single reduce task-attempt
+ */
+ synchronized int getNumSlotsPerReduce() {
+ return numSlotsPerReduce;
+ }
+
+ /**
+ * Set the number of slots required to run a single reduce task-attempt.
+ * This is typically set by schedulers which support high-ram jobs.
+ * @param slots the number of slots required to run a single reduce
+ * task-attempt
+ */
+ synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+ this.numSlotsPerReduce = numSlotsPerReduce;
+ }
+
+ /**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
@@ -449,7 +523,7 @@ class JobInProgress {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
- jobtracker, conf, this, i);
+ jobtracker, conf, this, i, numSlotsPerMap);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
@@ -467,7 +541,7 @@ class JobInProgress {
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
- jobtracker, conf, this);
+ jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}
@@ -486,12 +560,12 @@ class JobInProgress {
// split.
JobClient.RawSplit emptySplit = new JobClient.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
- jobtracker, conf, this, numMapTasks);
+ jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks, jobtracker, conf, this);
+ numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();
// create two setup tips, one map and one reduce.
@@ -500,12 +574,12 @@ class JobInProgress {
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
- jobtracker, conf, this, numMapTasks + 1 );
+ jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
- numReduceTasks + 1, jobtracker, conf, this);
+ numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
synchronized(jobInitKillStatus){
@@ -564,6 +638,15 @@ class JobInProgress {
return numReduceTasks - runningReduceTasks - failedReduceTIPs -
finishedReduceTasks + speculativeReduceTasks;
}
+ public synchronized int getNumSlotsPerTask(TaskType taskType) {
+ if (taskType == TaskType.MAP) {
+ return numSlotsPerMap;
+ } else if (taskType == TaskType.REDUCE) {
+ return numSlotsPerReduce;
+ } else {
+ return 1;
+ }
+ }
public JobPriority getPriority() {
return this.priority;
}
@@ -778,8 +861,10 @@ class JobInProgress {
if (change) {
TaskStatus.State state = status.getRunState();
// get the TaskTrackerStatus where the task ran
- TaskTrackerStatus ttStatus =
+ TaskTracker taskTracker =
this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
+ TaskTrackerStatus ttStatus =
+ (taskTracker == null) ? null : taskTracker.getStatus();
String httpTaskLogLocation = null;
if (null != ttStatus){
@@ -841,7 +926,7 @@ class JobInProgress {
}
// Tell the job to fail the relevant task
- failedTask(tip, taskid, status, ttStatus,
+ failedTask(tip, taskid, status, taskTracker,
wasRunning, wasComplete);
// Did the task failure lead to tip failure?
@@ -1374,9 +1459,9 @@ class JobInProgress {
* to the blacklist iff too many trackers in the cluster i.e.
* (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
*
- * @param trackerName task-tracker on which a task failed
+ * @param taskTracker task-tracker on which a task failed
*/
- void addTrackerTaskFailure(String trackerName) {
+ void addTrackerTaskFailure(String trackerName, TaskTracker taskTracker) {
if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) {
String trackerHostName = convertTrackerNameToHostName(trackerName);
@@ -1389,11 +1474,78 @@ class JobInProgress {
// Check if this tasktracker has turned 'flaky'
if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
++flakyTaskTrackers;
+
+ // Cancel reservations if appropriate
+ if (taskTracker != null) {
+ taskTracker.unreserveSlots(TaskType.MAP, this);
+ taskTracker.unreserveSlots(TaskType.REDUCE, this);
+ }
LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
}
}
}
+
+ public synchronized void reserveTaskTracker(TaskTracker taskTracker,
+ TaskType type, int numSlots) {
+ Map<TaskTracker, FallowSlotInfo> map =
+ (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
+
+ long now = System.currentTimeMillis();
+
+ FallowSlotInfo info = map.get(taskTracker);
+ if (info == null) {
+ info = new FallowSlotInfo(now, numSlots);
+ } else {
+ // Increment metering info if the reservation is changing
+ if (info.getNumSlots() != numSlots) {
+ Enum<Counter> counter =
+ (type == TaskType.MAP) ?
+ Counter.FALLOW_SLOTS_MILLIS_MAPS :
+ Counter.FALLOW_SLOTS_MILLIS_REDUCES;
+ long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+ jobCounters.incrCounter(counter, fallowSlotMillis);
+
+ // Update
+ info.setTimestamp(now);
+ info.setNumSlots(numSlots);
+ }
+ }
+ map.put(taskTracker, info);
+ }
+
+ public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
+ TaskType type) {
+ Map<TaskTracker, FallowSlotInfo> map =
+ (type == TaskType.MAP) ? trackersReservedForMaps :
+ trackersReservedForReduces;
+
+ FallowSlotInfo info = map.get(taskTracker);
+ if (info == null) {
+ LOG.warn("Cannot find information about fallow slots for " +
+ taskTracker.getTrackerName());
+ return;
+ }
+ long now = System.currentTimeMillis();
+
+ Enum<Counter> counter =
+ (type == TaskType.MAP) ?
+ Counter.FALLOW_SLOTS_MILLIS_MAPS :
+ Counter.FALLOW_SLOTS_MILLIS_REDUCES;
+ long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+ jobCounters.incrCounter(counter, fallowSlotMillis);
+
+ map.remove(taskTracker);
+ }
+
+ public int getNumReservedTaskTrackersForMaps() {
+ return trackersReservedForMaps.size();
+ }
+
+ public int getNumReservedTaskTrackersForReduces() {
+ return trackersReservedForReduces.size();
+ }
+
private int getTrackerTaskFailures(String trackerName) {
String trackerHostName = convertTrackerNameToHostName(trackerName);
Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
@@ -2008,7 +2160,7 @@ class JobInProgress {
// Update jobhistory
TaskTrackerStatus ttStatus =
- this.jobtracker.getTaskTracker(status.getTaskTracker());
+ this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
String taskType = getTaskType(tip);
if (status.getIsMap()){
@@ -2121,6 +2273,7 @@ class JobInProgress {
this.status.setReduceProgress(1.0f);
}
this.finishTime = System.currentTimeMillis();
+ cancelReservedSlots();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime,
@@ -2204,9 +2357,20 @@ class JobInProgress {
for (int i = 0; i < reduces.length; i++) {
reduces[i].kill();
}
+
+ // Clear out reserved tasktrackers
+ cancelReservedSlots();
}
}
+ private void cancelReservedSlots() {
+ for (TaskTracker tt : trackersReservedForMaps.keySet()) {
+ tt.unreserveSlots(TaskType.MAP, this);
+ }
+ for (TaskTracker tt : trackersReservedForReduces.keySet()) {
+ tt.unreserveSlots(TaskType.REDUCE, this);
+ }
+ }
private void clearUncleanTasks() {
TaskAttemptID taskid = null;
TaskInProgress tip = null;
@@ -2264,7 +2428,7 @@ class JobInProgress {
*/
private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
TaskStatus status,
- TaskTrackerStatus taskTrackerStatus,
+ TaskTracker taskTracker,
boolean wasRunning, boolean wasComplete) {
final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
// check if the TIP is already failed
@@ -2324,6 +2488,8 @@ class JobInProgress {
String taskTrackerName = taskStatus.getTaskTracker();
String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
int taskTrackerPort = -1;
+ TaskTrackerStatus taskTrackerStatus =
+ (taskTracker == null) ? null : taskTracker.getStatus();
if (taskTrackerStatus != null) {
taskTrackerPort = taskTrackerStatus.getHttpPort();
}
@@ -2369,7 +2535,7 @@ class JobInProgress {
// Note down that a task has failed on this tasktracker
//
if (status.getRunState() == TaskStatus.State.FAILED) {
- addTrackerTaskFailure(taskTrackerName);
+ addTrackerTaskFailure(taskTrackerName, taskTracker);
}
//
@@ -2460,6 +2626,9 @@ class JobInProgress {
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(),
taskid,
0.0f,
+ tip.isMapTask() ?
+ numSlotsPerMap :
+ numSlotsPerReduce,
state,
reason,
reason,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Mar 4 03:25:10 2011
@@ -1,12 +1,14 @@
# ResourceBundle properties file for job-level counters
-CounterGroupName= Job Counters
+CounterGroupName= Job Counters
-NUM_FAILED_MAPS.name= Failed map tasks
-NUM_FAILED_REDUCES.name= Failed reduce tasks
-TOTAL_LAUNCHED_MAPS.name= Launched map tasks
-TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
-OTHER_LOCAL_MAPS.name= Other local map tasks
-DATA_LOCAL_MAPS.name= Data-local map tasks
-RACK_LOCAL_MAPS.name= Rack-local map tasks
+NUM_FAILED_MAPS.name= Failed map tasks
+NUM_FAILED_REDUCES.name= Failed reduce tasks
+TOTAL_LAUNCHED_MAPS.name= Launched map tasks
+TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
+OTHER_LOCAL_MAPS.name= Other local map tasks
+DATA_LOCAL_MAPS.name= Data-local map tasks
+RACK_LOCAL_MAPS.name= Rack-local map tasks
+FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
+FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Mar 4 03:25:10 2011
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
@@ -77,9 +78,9 @@ class JobQueueTaskScheduler extends Task
}
@Override
- public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+ public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
-
+ TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
@@ -91,10 +92,10 @@ class JobQueueTaskScheduler extends Task
//
// Get map + reduce counts for the current tracker.
//
- final int trackerMapCapacity = taskTracker.getMaxMapTasks();
- final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
- final int trackerRunningMaps = taskTracker.countMapTasks();
- final int trackerRunningReduces = taskTracker.countReduceTasks();
+ final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
+ final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
+ final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
+ final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();
// Assigned tasks
List<Task> assignedTasks = new ArrayList<Task>();
@@ -167,7 +168,7 @@ class JobQueueTaskScheduler extends Task
// Try to schedule a node-local or rack-local Map task
t =
- job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
+ job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
@@ -186,7 +187,7 @@ class JobQueueTaskScheduler extends Task
// Try to schedule a node-local or rack-local Map task
t =
- job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
+ job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
@@ -224,7 +225,7 @@ class JobQueueTaskScheduler extends Task
}
Task t =
- job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+ job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if (t != null) {
@@ -243,7 +244,7 @@ class JobQueueTaskScheduler extends Task
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
+ LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
"[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
(trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:25:10 2011
@@ -93,6 +93,9 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
/*******************************************************
* JobTracker is the central location for submitting and
* tracking MR jobs in a network environment.
@@ -278,7 +281,8 @@ public class JobTracker implements MRCon
JobInProgress job = tip.getJob();
String trackerName = getAssignedTracker(taskId);
TaskTrackerStatus trackerStatus =
- getTaskTracker(trackerName);
+ getTaskTrackerStatus(trackerName);
+
// This might happen when the tasktracker has already
// expired and this thread tries to call failedtask
// again. expire tasktracker should have called failed
@@ -359,22 +363,25 @@ public class JobTracker implements MRCon
long now = System.currentTimeMillis();
TaskTrackerStatus leastRecent = null;
while ((trackerExpiryQueue.size() > 0) &&
- ((leastRecent = trackerExpiryQueue.first()) != null) &&
- (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
+ (leastRecent = trackerExpiryQueue.first()) != null &&
+ ((now - leastRecent.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL)) {
+
// Remove profile from head of queue
trackerExpiryQueue.remove(leastRecent);
String trackerName = leastRecent.getTrackerName();
// Figure out if last-seen time should be updated, or if tracker is dead
- TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName());
+ TaskTracker current = getTaskTracker(trackerName);
+ TaskTrackerStatus newProfile =
+ (current == null ) ? null : current.getStatus();
// Items might leave the taskTracker set through other means; the
// status stored in 'taskTrackers' might be null, which means the
// tracker has already been destroyed.
if (newProfile != null) {
- if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+ if ((now - newProfile.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL) {
// Remove completely after marking the tasks as 'KILLED'
- lostTaskTracker(leastRecent.getTrackerName());
+ lostTaskTracker(current);
// tracker is lost, and if it is blacklisted, remove
// it from the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
@@ -384,7 +391,7 @@ public class JobTracker implements MRCon
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
- hostnameToTrackerName.get(hostname).remove(trackerName);
+ hostnameToTaskTracker.get(hostname).remove(trackerName);
} else {
// Update time by inserting latest profile
trackerExpiryQueue.add(newProfile);
@@ -638,9 +645,9 @@ public class JobTracker implements MRCon
synchronized (taskTrackers) {
// remove the capacity of trackers on this host
for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
- int mapSlots = status.getMaxMapTasks();
+ int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
- int reduceSlots = status.getMaxReduceTasks();
+ int reduceSlots = status.getMaxReduceSlots();
totalReduceTaskCapacity -= reduceSlots;
getInstrumentation().addBlackListedMapSlots(
mapSlots);
@@ -657,9 +664,9 @@ public class JobTracker implements MRCon
int numTrackersOnHost = 0;
// add the capacity of trackers on the host
for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
- int mapSlots = status.getMaxMapTasks();
+ int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
- int reduceSlots = status.getMaxReduceTasks();
+ int reduceSlots = status.getMaxReduceSlots();
totalReduceTaskCapacity += reduceSlots;
numTrackersOnHost++;
getInstrumentation().decBlackListedMapSlots(mapSlots);
@@ -707,7 +714,8 @@ public class JobTracker implements MRCon
private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
synchronized (taskTrackers) {
- for (TaskTrackerStatus status : taskTrackers.values()) {
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus status = tt.getStatus();
if (hostName.equals(status.getHost())) {
statuses.add(status);
}
@@ -1041,14 +1049,14 @@ public class JobTracker implements MRCon
// II. Create the (appropriate) task status
if (type.equals(Values.MAP.name())) {
taskStatus =
- new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING,
- "", "", trackerName, TaskStatus.Phase.MAP,
- new Counters());
+ new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
+ TaskStatus.State.RUNNING, "", "", trackerName,
+ TaskStatus.Phase.MAP, new Counters());
} else {
taskStatus =
- new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING,
- "", "", trackerName, TaskStatus.Phase.REDUCE,
- new Counters());
+ new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE),
+ TaskStatus.State.RUNNING, "", "", trackerName,
+ TaskStatus.Phase.REDUCE, new Counters());
}
// Set the start time
@@ -1067,10 +1075,13 @@ public class JobTracker implements MRCon
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
// IV. Register a new tracker
- boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+ TaskTracker taskTracker = getTaskTracker(trackerName);
+ boolean isTrackerRegistered = (taskTracker != null);
if (!isTrackerRegistered) {
markTracker(trackerName); // add the tracker to recovery-manager
- addNewTracker(ttStatus);
+ taskTracker = new TaskTracker(trackerName);
+ taskTracker.setStatus(ttStatus);
+ addNewTracker(taskTracker);
}
// V. Update the tracker status
@@ -1441,17 +1452,17 @@ public class JobTracker implements MRCon
long now = System.currentTimeMillis();
int size = trackerExpiryQueue.size();
for (int i = 0; i < size ; ++i) {
- // Get the first status
- TaskTrackerStatus status = trackerExpiryQueue.first();
+ // Get the first tasktracker
+ TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
// Remove it
- trackerExpiryQueue.remove(status);
+ trackerExpiryQueue.remove(taskTracker);
// Set the new time
- status.setLastSeen(now);
+ taskTracker.setLastSeen(now);
// Add back to get the sorted list
- trackerExpiryQueue.add(status);
+ trackerExpiryQueue.add(taskTracker);
}
}
@@ -1516,11 +1527,10 @@ public class JobTracker implements MRCon
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
new TreeMap<TaskAttemptID, TaskInProgress>();
- // (hostname --> Set(trackername))
// This is used to keep track of all trackers running on one host. While
// decommissioning the host, all the trackers on the host will be lost.
- Map<String, Set<String>> hostnameToTrackerName =
- Collections.synchronizedMap(new TreeMap<String, Set<String>>());
+ Map<String, Set<TaskTracker>> hostnameToTaskTracker =
+ Collections.synchronizedMap(new TreeMap<String, Set<TaskTracker>>());
// (taskid --> trackerID)
@@ -1557,8 +1567,8 @@ public class JobTracker implements MRCon
//
int totalMaps = 0;
int totalReduces = 0;
- private HashMap<String, TaskTrackerStatus> taskTrackers =
- new HashMap<String, TaskTrackerStatus>();
+ private HashMap<String, TaskTracker> taskTrackers =
+ new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
ExpireTrackers expireTrackers = new ExpireTrackers();
Thread expireTrackersThread = null;
@@ -1573,7 +1583,7 @@ public class JobTracker implements MRCon
RecoveryManager recoveryManager;
/**
- * It might seem like a bug to maintain a TreeSet of status objects,
+ * It might seem like a bug to maintain a TreeSet of tasktracker objects,
* which can be updated at any time. But that's not what happens! We
* only update status objects in the taskTrackers table. Status objects
* are never updated once they enter the expiry queue. Instead, we wait
@@ -2382,9 +2392,15 @@ public class JobTracker implements MRCon
* @return {@link Collection} of {@link TaskTrackerStatus}
*/
public Collection<TaskTrackerStatus> taskTrackers() {
+ Collection<TaskTrackerStatus> ttStatuses;
synchronized (taskTrackers) {
- return taskTrackers.values();
+ ttStatuses =
+ new ArrayList<TaskTrackerStatus>(taskTrackers.values().size());
+ for (TaskTracker tt : taskTrackers.values()) {
+ ttStatuses.add(tt.getStatus());
+ }
}
+ return ttStatuses;
}
/**
@@ -2396,7 +2412,8 @@ public class JobTracker implements MRCon
Collection<TaskTrackerStatus> activeTrackers =
new ArrayList<TaskTrackerStatus>();
synchronized (taskTrackers) {
- for (TaskTrackerStatus status : taskTrackers.values()) {
+ for ( TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus status = tt.getStatus();
if (!faultyTrackers.isBlacklisted(status.getHost())) {
activeTrackers.add(status);
}
@@ -2417,7 +2434,8 @@ public class JobTracker implements MRCon
List<String> blacklistedTrackers =
new ArrayList<String>();
synchronized (taskTrackers) {
- for (TaskTrackerStatus status : taskTrackers.values()) {
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus status = tt.getStatus();
if (!faultyTrackers.isBlacklisted(status.getHost())) {
activeTrackers.add(status.getTrackerName());
} else {
@@ -2440,7 +2458,8 @@ public class JobTracker implements MRCon
Collection<TaskTrackerStatus> blacklistedTrackers =
new ArrayList<TaskTrackerStatus>();
synchronized (taskTrackers) {
- for (TaskTrackerStatus status : taskTrackers.values()) {
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus status = tt.getStatus();
if (faultyTrackers.isBlacklisted(status.getHost())) {
blacklistedTrackers.add(status);
}
@@ -2470,14 +2489,22 @@ public class JobTracker implements MRCon
* @return true if blacklisted, false otherwise
*/
public boolean isBlacklisted(String trackerID) {
- TaskTrackerStatus status = getTaskTracker(trackerID);
+ TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
if (status != null) {
return faultyTrackers.isBlacklisted(status.getHost());
}
return false;
}
- public TaskTrackerStatus getTaskTracker(String trackerID) {
+ public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
+ TaskTracker taskTracker;
+ synchronized (taskTrackers) {
+ taskTracker = taskTrackers.get(trackerID);
+ }
+ return (taskTracker == null) ? null : taskTracker.getStatus();
+ }
+
+ public TaskTracker getTaskTracker(String trackerID) {
synchronized (taskTrackers) {
return taskTrackers.get(trackerID);
}
@@ -2491,7 +2518,8 @@ public class JobTracker implements MRCon
*
* @param status Task Tracker's status
*/
- private void addNewTracker(TaskTrackerStatus status) {
+ private void addNewTracker(TaskTracker taskTracker) {
+ TaskTrackerStatus status = taskTracker.getStatus();
trackerExpiryQueue.add(status);
// Register the tracker if its not registered
@@ -2502,14 +2530,14 @@ public class JobTracker implements MRCon
}
// add it to the set of tracker per host
- Set<String> trackers = hostnameToTrackerName.get(hostname);
+ Set<TaskTracker> trackers = hostnameToTaskTracker.get(hostname);
if (trackers == null) {
- trackers = Collections.synchronizedSet(new HashSet<String>());
- hostnameToTrackerName.put(hostname, trackers);
+ trackers = Collections.synchronizedSet(new HashSet<TaskTracker>());
+ hostnameToTaskTracker.put(hostname, trackers);
}
LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ hostname);
- trackers.add(status.getTrackerName());
+ trackers.add(taskTracker);
}
public Node resolveAndAddToTopology(String name) {
@@ -2621,11 +2649,13 @@ public class JobTracker implements MRCon
boolean acceptNewTasks,
short responseId)
throws IOException {
- LOG.debug("Got heartbeat from: " + status.getTrackerName() +
- " (restarted: " + restarted +
- " initialContact: " + initialContact +
- " acceptNewTasks: " + acceptNewTasks + ")" +
- " with responseId: " + responseId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
+ " (restarted: " + restarted +
+ " initialContact: " + initialContact +
+ " acceptNewTasks: " + acceptNewTasks + ")" +
+ " with responseId: " + responseId);
+ }
// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
@@ -2701,13 +2731,13 @@ public class JobTracker implements MRCon
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
- TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
+ TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
- tasks = taskScheduler.assignTasks(taskTrackerStatus);
+ tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if (tasks != null) {
for (Task task : tasks) {
@@ -2807,14 +2837,15 @@ public class JobTracker implements MRCon
*/
private boolean updateTaskTrackerStatus(String trackerName,
TaskTrackerStatus status) {
- TaskTrackerStatus oldStatus = taskTrackers.get(trackerName);
+ TaskTracker tt = getTaskTracker(trackerName);
+ TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
- int mapSlots = oldStatus.getMaxMapTasks();
+ int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
- int reduceSlots = oldStatus.getMaxReduceTasks();
+ int reduceSlots = oldStatus.getMaxReduceSlots();
totalReduceTaskCapacity -= reduceSlots;
}
if (status == null) {
@@ -2834,16 +2865,56 @@ public class JobTracker implements MRCon
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
if (!faultyTrackers.isBlacklisted(status.getHost())) {
- int mapSlots = status.getMaxMapTasks();
+ int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
- int reduceSlots = status.getMaxReduceTasks();
+ int reduceSlots = status.getMaxReduceSlots();
totalReduceTaskCapacity += reduceSlots;
}
boolean alreadyPresent = false;
- if (taskTrackers.containsKey(trackerName)) {
+ TaskTracker taskTracker = taskTrackers.get(trackerName);
+ if (taskTracker != null) {
alreadyPresent = true;
+ } else {
+ taskTracker = new TaskTracker(trackerName);
+ }
+
+ taskTracker.setStatus(status);
+ taskTrackers.put(trackerName, taskTracker);
+
+ if (LOG.isDebugEnabled()) {
+ int runningMaps = 0, runningReduces = 0;
+ int commitPendingMaps = 0, commitPendingReduces = 0;
+ int unassignedMaps = 0, unassignedReduces = 0;
+ int miscMaps = 0, miscReduces = 0;
+ List<TaskStatus> taskReports = status.getTaskReports();
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+ TaskStatus ts = (TaskStatus) it.next();
+ boolean isMap = ts.getIsMap();
+ TaskStatus.State state = ts.getRunState();
+ if (state == TaskStatus.State.RUNNING) {
+ if (isMap) { ++runningMaps; }
+ else { ++runningReduces; }
+ } else if (state == TaskStatus.State.UNASSIGNED) {
+ if (isMap) { ++unassignedMaps; }
+ else { ++unassignedReduces; }
+ } else if (state == TaskStatus.State.COMMIT_PENDING) {
+ if (isMap) { ++commitPendingMaps; }
+ else { ++commitPendingReduces; }
+ } else {
+ if (isMap) { ++miscMaps; }
+ else { ++miscReduces; }
+ }
+ }
+ LOG.debug(trackerName + ": Status -" +
+ " running(m) = " + runningMaps +
+ " unassigned(m) = " + unassignedMaps +
+ " commit_pending(m) = " + commitPendingMaps +
+ " misc(m) = " + miscMaps +
+ " running(r) = " + runningReduces +
+ " unassigned(r) = " + unassignedReduces +
+ " commit_pending(r) = " + commitPendingReduces +
+ " misc(r) = " + miscReduces);
}
- taskTrackers.put(trackerName, status);
if (!alreadyPresent) {
Integer numTaskTrackersInHost =
@@ -2872,11 +2943,12 @@ public class JobTracker implements MRCon
synchronized (trackerExpiryQueue) {
boolean seenBefore = updateTaskTrackerStatus(trackerName,
trackerStatus);
+ TaskTracker taskTracker = getTaskTracker(trackerName);
if (initialContact) {
// If it's first contact, then clear out
// any state hanging around
if (seenBefore) {
- lostTaskTracker(trackerName);
+ lostTaskTracker(taskTracker);
}
} else {
// If not first contact, there should be some record of the tracker
@@ -2893,7 +2965,7 @@ public class JobTracker implements MRCon
if (isBlacklisted(trackerName)) {
faultyTrackers.numBlacklistedTrackers += 1;
}
- addNewTracker(trackerStatus);
+ addNewTracker(taskTracker);
}
}
}
@@ -3012,8 +3084,8 @@ public class JobTracker implements MRCon
// returns cleanup tasks first, then setup tasks.
private synchronized List<Task> getSetupAndCleanupTasks(
TaskTrackerStatus taskTracker) throws IOException {
- int maxMapTasks = taskTracker.getMaxMapTasks();
- int maxReduceTasks = taskTracker.getMaxReduceTasks();
+ int maxMapTasks = taskTracker.getMaxMapSlots();
+ int maxReduceTasks = taskTracker.getMaxReduceSlots();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
int numTaskTrackers = getClusterStatus().getTaskTrackers();
@@ -3810,7 +3882,8 @@ public class JobTracker implements MRCon
* already been updated. Just process the contained tasks and any
* jobs that might be affected.
*/
- void lostTaskTracker(String trackerName) {
+ void lostTaskTracker(TaskTracker taskTracker) {
+ String trackerName = taskTracker.getTrackerName();
LOG.info("Lost tracker '" + trackerName + "'");
// remove the tracker from the local structures
@@ -3868,10 +3941,14 @@ public class JobTracker implements MRCon
// Penalize this tracker for each of the jobs which
// had any tasks running on it when it was 'lost'
+ // Also, remove any reserved slots on this tasktracker
for (JobInProgress job : jobsWithFailures) {
- job.addTrackerTaskFailure(trackerName);
+ job.addTrackerTaskFailure(trackerName, taskTracker);
}
-
+
+ // Cleanup
+ taskTracker.lost();
+
// Purge 'marked' tasks, needs to be done
// here to prevent hanging references!
removeMarkedTasks(trackerName);
@@ -3901,9 +3978,9 @@ public class JobTracker implements MRCon
hostsReader.refresh();
Set<String> excludeSet = new HashSet<String>();
- for(Map.Entry<String, TaskTrackerStatus> eSet : taskTrackers.entrySet()) {
+ for(Map.Entry<String, TaskTracker> eSet : taskTrackers.entrySet()) {
String trackerName = eSet.getKey();
- TaskTrackerStatus status = eSet.getValue();
+ TaskTrackerStatus status = eSet.getValue().getStatus();
// Check if not include i.e not in host list or in hosts list but excluded
if (!inHostsList(status) || inExcludedHostsList(status)) {
excludeSet.add(status.getHost()); // add to rejected trackers
@@ -3921,12 +3998,13 @@ public class JobTracker implements MRCon
synchronized (trackerExpiryQueue) {
for (String host : hosts) {
LOG.info("Decommissioning host " + host);
- Set<String> trackers = hostnameToTrackerName.remove(host);
+ Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
if (trackers != null) {
- for (String tracker : trackers) {
- LOG.info("Losing tracker " + tracker + " on host " + host);
+ for (TaskTracker tracker : trackers) {
+ LOG.info("Decommission: Losing tracker " + tracker +
+ " on host " + host);
lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(tracker, null);
+ updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
}
}
LOG.info("Host " + host + " is ready for decommissioning");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Fri Mar 4 03:25:10 2011
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that limits the maximum number of tasks
@@ -69,9 +70,9 @@ class LimitTasksPerJobTaskScheduler exte
}
@Override
- public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+ public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
-
+ TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
final int numTaskTrackers =
taskTrackerManager.getClusterStatus().getTaskTrackers();
Collection<JobInProgress> jobQueue =
@@ -79,10 +80,10 @@ class LimitTasksPerJobTaskScheduler exte
Task task;
/* Stats about the current taskTracker */
- final int mapTasksNumber = taskTracker.countMapTasks();
- final int reduceTasksNumber = taskTracker.countReduceTasks();
- final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
- final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+ final int mapTasksNumber = taskTrackerStatus.countMapTasks();
+ final int reduceTasksNumber = taskTrackerStatus.countReduceTasks();
+ final int maximumMapTasksNumber = taskTrackerStatus.getMaxMapSlots();
+ final int maximumReduceTasksNumber = taskTrackerStatus.getMaxReduceSlots();
/*
* Statistics about the whole cluster. Most are approximate because of
@@ -141,11 +142,11 @@ class LimitTasksPerJobTaskScheduler exte
continue;
}
if (step == 0 || step == 2) {
- task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+ task = job.obtainNewMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
}
else {
- task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+ task = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
}
if (task != null) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:25:10 2011
@@ -166,7 +166,8 @@ class LocalJobRunner implements JobSubmi
MapTask map = new MapTask(file.toString(),
mapId, i,
rawSplits[i].getClassName(),
- rawSplits[i].getBytes(), job.getUser());
+ rawSplits[i].getBytes(), 1,
+ job.getUser());
JobConf localConf = new JobConf(job);
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
@@ -205,7 +206,8 @@ class LocalJobRunner implements JobSubmi
}
if (!this.isInterrupted()) {
ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size(), job.getUser());
+ reduceId, 0, mapIds.size(),
+ 1, job.getUser());
JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar 4 03:25:10 2011
@@ -90,8 +90,8 @@ class MapTask extends Task {
public MapTask(String jobFile, TaskAttemptID taskId,
int partition, String splitClass, BytesWritable split,
- String username) {
- super(jobFile, taskId, partition, username);
+ int numSlotsRequired, String username) {
+ super(jobFile, taskId, partition, numSlotsRequired, username);
this.splitClass = splitClass;
this.split = split;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java Fri Mar 4 03:25:10 2011
@@ -23,10 +23,10 @@ class MapTaskStatus extends TaskStatus {
public MapTaskStatus() {}
- public MapTaskStatus(TaskAttemptID taskid, float progress,
+ public MapTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
State runState, String diagnosticInfo, String stateString,
String taskTracker, Phase phase, Counters counters) {
- super(taskid, progress, runState, diagnosticInfo, stateString,
+ super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
taskTracker, phase, counters);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:25:10 2011
@@ -150,8 +150,9 @@ class ReduceTask extends Task {
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
- int partition, int numMaps, String username) {
- super(jobFile, taskId, partition, username);
+ int partition, int numMaps, int numSlotsRequired,
+ String username) {
+ super(jobFile, taskId, partition, numSlotsRequired, username);
this.numMaps = numMaps;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java Fri Mar 4 03:25:10 2011
@@ -34,11 +34,11 @@ class ReduceTaskStatus extends TaskStatu
public ReduceTaskStatus() {}
- public ReduceTaskStatus(TaskAttemptID taskid, float progress, State runState,
- String diagnosticInfo, String stateString, String taskTracker,
- Phase phase, Counters counters) {
- super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker,
- phase, counters);
+ public ReduceTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
+ State runState, String diagnosticInfo, String stateString,
+ String taskTracker, Phase phase, Counters counters) {
+ super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
+ taskTracker, phase, counters);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:25:10 2011
@@ -52,8 +52,12 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-/** Base class for tasks. */
-abstract class Task implements Writable, Configurable {
+/**
+ * Base class for tasks.
+ *
+ * This is NOT a public interface.
+ */
+abstract public class Task implements Writable, Configurable {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
@@ -140,6 +144,7 @@ abstract class Task implements Writable,
protected final Counters.Counter spilledRecordsCounter;
private String pidFile = "";
protected TaskUmbilicalProtocol umbilical;
+ private int numSlotsRequired;
////////////////////////////////////////////
// Constructors
@@ -151,14 +156,16 @@ abstract class Task implements Writable,
spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
- public Task(String jobFile, TaskAttemptID taskId, int partition, String username) {
+ public Task(String jobFile, TaskAttemptID taskId, int partition,
+ int numSlotsRequired, String username) {
this.username = username;
this.jobFile = jobFile;
this.taskId = taskId;
this.partition = partition;
+ this.numSlotsRequired = numSlotsRequired;
this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
- 0.0f,
+ 0.0f, numSlotsRequired,
TaskStatus.State.UNASSIGNED,
"", "", "",
isMapTask() ?
@@ -175,6 +182,10 @@ abstract class Task implements Writable,
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
public TaskAttemptID getTaskID() { return taskId; }
+ public int getNumSlotsRequired() {
+ return numSlotsRequired;
+ }
+
Counters getCounters() { return counters; }
public void setPidFile(String pidFile) {
this.pidFile = pidFile;
@@ -201,14 +212,14 @@ abstract class Task implements Writable,
/**
* Return current phase of the task.
* needs to be synchronized as communication thread sends the phase every second
- * @return
+ * @return the curent phase of the task
*/
public synchronized TaskStatus.Phase getPhase(){
return this.taskStatus.getPhase();
}
/**
* Set current phase of the task.
- * @param p
+ * @param phase task phase
*/
protected synchronized void setPhase(TaskStatus.Phase phase){
this.taskStatus.setPhase(phase);
@@ -331,6 +342,7 @@ abstract class Task implements Writable,
Text.writeString(out, jobFile);
taskId.write(out);
out.writeInt(partition);
+ out.writeInt(numSlotsRequired);
taskStatus.write(out);
skipRanges.write(out);
out.writeBoolean(skipping);
@@ -346,6 +358,7 @@ abstract class Task implements Writable,
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
partition = in.readInt();
+ numSlotsRequired = in.readInt();
taskStatus.readFields(in);
this.mapOutputFile.setJobId(taskId.getJobID());
skipRanges.readFields(in);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 4 03:25:10 2011
@@ -67,6 +67,7 @@ class TaskInProgress {
private JobTracker jobtracker;
private TaskID id;
private JobInProgress job;
+ private final int numSlotsRequired;
// Status of the TIP
private int successEventNumber = -1;
@@ -132,7 +133,8 @@ class TaskInProgress {
public TaskInProgress(JobID jobid, String jobFile,
RawSplit rawSplit,
JobTracker jobtracker, JobConf conf,
- JobInProgress job, int partition) {
+ JobInProgress job, int partition,
+ int numSlotsRequired) {
this.jobFile = jobFile;
this.rawSplit = rawSplit;
this.jobtracker = jobtracker;
@@ -140,6 +142,7 @@ class TaskInProgress {
this.conf = conf;
this.partition = partition;
this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+ this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
}
@@ -150,7 +153,7 @@ class TaskInProgress {
public TaskInProgress(JobID jobid, String jobFile,
int numMaps,
int partition, JobTracker jobtracker, JobConf conf,
- JobInProgress job) {
+ JobInProgress job, int numSlotsRequired) {
this.jobFile = jobFile;
this.numMaps = numMaps;
this.partition = partition;
@@ -158,6 +161,7 @@ class TaskInProgress {
this.job = job;
this.conf = conf;
this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
+ this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
}
@@ -523,7 +527,9 @@ class TaskInProgress {
newState != TaskStatus.State.UNASSIGNED) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
- "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+ "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
+ "oldTT=" + oldStatus.getTaskTracker() +
+ " while newTT=" + status.getTaskTracker());
return false;
}
@@ -929,9 +935,10 @@ class TaskInProgress {
split = new BytesWritable();
}
t = new MapTask(jobFile, taskid, partition, splitClass, split,
- job.getUser());
+ numSlotsRequired, job.getUser());
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, job.getUser());
+ t = new ReduceTask(jobFile, taskid, partition, numMaps,
+ numSlotsRequired, job.getUser());
}
if (jobCleanup) {
t.setJobCleanupTask();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Mar 4 03:25:10 2011
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* Used by a {@link JobTracker} to schedule {@link Task}s on
@@ -36,7 +37,7 @@ import org.apache.hadoop.conf.Configurat
* between the job being added (when
* {@link JobInProgressListener#jobAdded(JobInProgress)} is called)
* and tasks for that job being assigned (by
- * {@link #assignTasks(TaskTrackerStatus)}).
+ * {@link #assignTasks(TaskTracker)}).
* @see EagerTaskInitializationListener
*/
abstract class TaskScheduler implements Configurable {
@@ -80,8 +81,8 @@ abstract class TaskScheduler implements
* @param taskTracker The TaskTracker for which we're looking for tasks.
* @return A list of tasks to run on that TaskTracker, possibly empty.
*/
- public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
- throws IOException;
+ public abstract List<Task> assignTasks(TaskTracker taskTracker)
+ throws IOException;
/**
* Returns a collection of jobs in an order which is specific to
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar 4 03:25:10 2011
@@ -49,6 +49,7 @@ abstract class TaskStatus implements Wri
private String diagnosticInfo;
private String stateString;
private String taskTracker;
+ private int numSlots;
private long startTime;
private long finishTime;
@@ -61,14 +62,16 @@ abstract class TaskStatus implements Wri
public TaskStatus() {
taskid = new TaskAttemptID();
+ numSlots = 0;
}
- public TaskStatus(TaskAttemptID taskid, float progress,
+ public TaskStatus(TaskAttemptID taskid, float progress, int numSlots,
State runState, String diagnosticInfo,
String stateString, String taskTracker,
Phase phase, Counters counters) {
this.taskid = taskid;
this.progress = progress;
+ this.numSlots = numSlots;
this.runState = runState;
this.diagnosticInfo = diagnosticInfo;
this.stateString = stateString;
@@ -80,6 +83,10 @@ abstract class TaskStatus implements Wri
public TaskAttemptID getTaskID() { return taskid; }
public abstract boolean getIsMap();
+ public int getNumSlots() {
+ return numSlots;
+ }
+
public float getProgress() { return progress; }
public void setProgress(float progress) { this.progress = progress; }
public State getRunState() { return runState; }
@@ -358,6 +365,7 @@ abstract class TaskStatus implements Wri
public void write(DataOutput out) throws IOException {
taskid.write(out);
out.writeFloat(progress);
+ out.writeInt(numSlots);
WritableUtils.writeEnum(out, runState);
Text.writeString(out, diagnosticInfo);
Text.writeString(out, stateString);
@@ -375,6 +383,7 @@ abstract class TaskStatus implements Wri
public void readFields(DataInput in) throws IOException {
this.taskid.readFields(in);
this.progress = in.readFloat();
+ this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class);
this.diagnosticInfo = Text.readString(in);
this.stateString = Text.readString(in);
@@ -394,24 +403,27 @@ abstract class TaskStatus implements Wri
// Factory-like methods to create/read/write appropriate TaskStatus objects
//////////////////////////////////////////////////////////////////////////////
- static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress,
+ static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId,
+ float progress, int numSlots,
State runState, String diagnosticInfo,
String stateString, String taskTracker,
Phase phase, Counters counters)
throws IOException {
boolean isMap = in.readBoolean();
- return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo,
- stateString, taskTracker, phase, counters);
+ return createTaskStatus(isMap, taskId, progress, numSlots, runState,
+ diagnosticInfo, stateString, taskTracker, phase,
+ counters);
}
- static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress,
- State runState, String diagnosticInfo,
- String stateString, String taskTracker,
- Phase phase, Counters counters) {
- return (isMap) ? new MapTaskStatus(taskId, progress, runState,
+ static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId,
+ float progress, int numSlots,
+ State runState, String diagnosticInfo,
+ String stateString, String taskTracker,
+ Phase phase, Counters counters) {
+ return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState,
diagnosticInfo, stateString, taskTracker,
phase, counters) :
- new ReduceTaskStatus(taskId, progress, runState,
+ new ReduceTaskStatus(taskId, progress, numSlots, runState,
diagnosticInfo, stateString,
taskTracker, phase, counters);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:25:10 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -201,8 +202,8 @@ public class TaskTracker
private static final String OUTPUT = "output";
private JobConf originalConf;
private JobConf fConf;
- private int maxCurrentMapTasks;
- private int maxCurrentReduceTasks;
+ private int maxMapSlots;
+ private int maxReduceSlots;
private int failures;
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
@@ -501,8 +502,8 @@ public class TaskTracker
}
// RPC initialization
- int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
- maxCurrentMapTasks : maxCurrentReduceTasks;
+ int max = maxMapSlots > maxReduceSlots ?
+ maxMapSlots : maxReduceSlots;
//set the num handlers to max*2 since canCommit may wait for the duration
//of a heartbeat RPC
this.taskReportServer =
@@ -540,8 +541,8 @@ public class TaskTracker
this.indexCache = new IndexCache(this.fConf);
- mapLauncher = new TaskLauncher(maxCurrentMapTasks);
- reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+ mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
+ reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
mapLauncher.start();
reduceLauncher.start();
Class<? extends TaskController> taskControllerClass
@@ -919,9 +920,9 @@ public class TaskTracker
*/
public TaskTracker(JobConf conf) throws IOException {
originalConf = conf;
- maxCurrentMapTasks = conf.getInt(
+ maxMapSlots = conf.getInt(
"mapred.tasktracker.map.tasks.maximum", 2);
- maxCurrentReduceTasks = conf.getInt(
+ maxReduceSlots = conf.getInt(
"mapred.tasktracker.reduce.tasks.maximum", 2);
this.jobTrackAddr = JobTracker.getAddress(conf);
String infoAddr =
@@ -1195,8 +1196,8 @@ public class TaskTracker
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
- maxCurrentMapTasks,
- maxCurrentReduceTasks);
+ maxMapSlots,
+ maxReduceSlots);
}
} else {
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
@@ -1209,9 +1210,10 @@ public class TaskTracker
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
- askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
- status.countReduceTasks() < maxCurrentReduceTasks) &&
- acceptNewTasks;
+ askForNewTask =
+ ((status.countOccupiedMapSlots() < maxMapSlots ||
+ status.countOccupiedReduceSlots() < maxReduceSlots) &&
+ acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
@@ -1585,12 +1587,12 @@ public class TaskTracker
private final int maxSlots;
private List<TaskInProgress> tasksToLaunch;
- public TaskLauncher(int numSlots) {
+ public TaskLauncher(TaskType taskType, int numSlots) {
this.maxSlots = numSlots;
this.numFreeSlots = new IntWritable(numSlots);
this.tasksToLaunch = new LinkedList<TaskInProgress>();
setDaemon(true);
- setName("TaskLauncher for task");
+ setName("TaskLauncher for " + taskType + " tasks");
}
public void addToTaskQueue(LaunchTaskAction action) {
@@ -1605,9 +1607,9 @@ public class TaskTracker
tasksToLaunch.clear();
}
- public void addFreeSlot() {
+ public void addFreeSlots(int numSlots) {
synchronized (numFreeSlots) {
- numFreeSlots.set(numFreeSlots.get() + 1);
+ numFreeSlots.set(numFreeSlots.get() + numSlots);
assert (numFreeSlots.get() <= maxSlots);
LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
numFreeSlots.notifyAll();
@@ -1618,22 +1620,29 @@ public class TaskTracker
while (!Thread.interrupted()) {
try {
TaskInProgress tip;
+ Task task;
synchronized (tasksToLaunch) {
while (tasksToLaunch.isEmpty()) {
tasksToLaunch.wait();
}
//get the TIP
tip = tasksToLaunch.remove(0);
- LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+ task = tip.getTask();
+ LOG.info("Trying to launch : " + tip.getTask().getTaskID() +
+ " which needs " + task.getNumSlotsRequired() + " slots");
}
- //wait for a slot to run
+ //wait for free slots to run
synchronized (numFreeSlots) {
- while (numFreeSlots.get() == 0) {
+ while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+ LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() +
+ " to launch " + task.getTaskID() + ", currently we have " +
+ numFreeSlots.get() + " free slots");
numFreeSlots.wait();
}
LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
- " and trying to launch "+tip.getTask().getTaskID());
- numFreeSlots.set(numFreeSlots.get() - 1);
+ " and trying to launch "+tip.getTask().getTaskID() +
+ " which needs " + task.getNumSlotsRequired() + " slots");
+ numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
assert (numFreeSlots.get() >= 0);
}
synchronized (tip) {
@@ -1642,7 +1651,7 @@ public class TaskTracker
tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
//got killed externally while still in the launcher queue
- addFreeSlot();
+ addFreeSlots(task.getNumSlotsRequired());
continue;
}
tip.slotTaken = true;
@@ -1809,6 +1818,7 @@ public class TaskTracker
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
+ task.getNumSlotsRequired(),
task.getState(),
diagnosticInfo.toString(),
"initializing",
@@ -2372,7 +2382,7 @@ public class TaskTracker
private synchronized void releaseSlot() {
if (slotTaken) {
if (launcher != null) {
- launcher.addFreeSlot();
+ launcher.addFreeSlots(task.getNumSlotsRequired());
}
slotTaken = false;
}
@@ -3014,11 +3024,11 @@ public class TaskTracker
}
int getMaxCurrentMapTasks() {
- return maxCurrentMapTasks;
+ return maxMapSlots;
}
int getMaxCurrentReduceTasks() {
- return maxCurrentReduceTasks;
+ return maxReduceSlots;
}
/**
@@ -3111,7 +3121,7 @@ public class TaskTracker
JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
JobConf.DISABLED_MEMORY_LIMIT);
totalMemoryAllottedForTasks =
- maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+ maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
* reduceSlotSizeMemoryOnTT;
if (totalMemoryAllottedForTasks < 0) {
//adding check for the old keys which might be used by the administrator
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar 4 03:25:10 2011
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.TaskStatus.State;
import java.io.*;
import java.util.*;
@@ -28,9 +31,11 @@ import java.util.*;
* of the most recent TaskTrackerStatus objects for each
* unique TaskTracker it knows about.
*
+ * This is NOT a public interface!
**************************************************/
-class TaskTrackerStatus implements Writable {
-
+public class TaskTrackerStatus implements Writable {
+ public static final Log LOG = LogFactory.getLog(TaskTrackerStatus.class);
+
static { // register a ctor
WritableFactories.setFactory
(TaskTrackerStatus.class,
@@ -247,19 +252,27 @@ class TaskTrackerStatus implements Writa
public List<TaskStatus> getTaskReports() {
return taskReports;
}
-
+
/**
- * Return the current MapTask count
+ * Is the given task considered as 'running' ?
+ * @param taskStatus
+ * @return
+ */
+ private boolean isTaskRunning(TaskStatus taskStatus) {
+ TaskStatus.State state = taskStatus.getRunState();
+ return (state == State.RUNNING || state == State.UNASSIGNED ||
+ taskStatus.inTaskCleanupPhase());
+ }
+
+ /**
+ * Get the number of running map tasks.
+ * @return the number of running map tasks
*/
public int countMapTasks() {
int mapCount = 0;
- for (Iterator it = taskReports.iterator(); it.hasNext();) {
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
TaskStatus ts = (TaskStatus) it.next();
- TaskStatus.State state = ts.getRunState();
- if (ts.getIsMap() &&
- ((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED) ||
- ts.inTaskCleanupPhase())) {
+ if (ts.getIsMap() && isTaskRunning(ts)) {
mapCount++;
}
}
@@ -267,17 +280,37 @@ class TaskTrackerStatus implements Writa
}
/**
- * Return the current ReduceTask count
+ * Get the number of occupied map slots.
+ * @return the number of occupied map slots
+ */
+ public int countOccupiedMapSlots() {
+ int mapSlotsCount = 0;
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+ TaskStatus ts = (TaskStatus) it.next();
+ if (ts.getIsMap() && isTaskRunning(ts)) {
+ mapSlotsCount += ts.getNumSlots();
+ }
+ }
+ return mapSlotsCount;
+ }
+
+ /**
+ * Get available map slots.
+ * @return available map slots
+ */
+ public int getAvailableMapSlots() {
+ return getMaxMapSlots() - countOccupiedMapSlots();
+ }
+
+ /**
+ * Get the number of running reduce tasks.
+ * @return the number of running reduce tasks
*/
public int countReduceTasks() {
int reduceCount = 0;
- for (Iterator it = taskReports.iterator(); it.hasNext();) {
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
TaskStatus ts = (TaskStatus) it.next();
- TaskStatus.State state = ts.getRunState();
- if ((!ts.getIsMap()) &&
- ((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED) ||
- ts.inTaskCleanupPhase())) {
+ if ((!ts.getIsMap()) && isTaskRunning(ts)) {
reduceCount++;
}
}
@@ -285,6 +318,30 @@ class TaskTrackerStatus implements Writa
}
/**
+ * Get the number of occupied reduce slots.
+ * @return the number of occupied reduce slots
+ */
+ public int countOccupiedReduceSlots() {
+ int reduceSlotsCount = 0;
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+ TaskStatus ts = (TaskStatus) it.next();
+ if ((!ts.getIsMap()) && isTaskRunning(ts)) {
+ reduceSlotsCount += ts.getNumSlots();
+ }
+ }
+ return reduceSlotsCount;
+ }
+
+ /**
+ * Get available reduce slots.
+ * @return available reduce slots
+ */
+ public int getAvailableReduceSlots() {
+ return getMaxReduceSlots() - countOccupiedReduceSlots();
+ }
+
+
+ /**
*/
public long getLastSeen() {
return lastSeen;
@@ -296,15 +353,18 @@ class TaskTrackerStatus implements Writa
}
/**
- * Get the maximum concurrent tasks for this node. (This applies
- * per type of task - a node with maxTasks==1 will run up to 1 map
- * and 1 reduce concurrently).
- * @return maximum tasks this node supports
+ * Get the maximum map slots for this node.
+ * @return the maximum map slots for this node
*/
- public int getMaxMapTasks() {
+ public int getMaxMapSlots() {
return maxMapTasks;
}
- public int getMaxReduceTasks() {
+
+ /**
+ * Get the maximum reduce slots for this node.
+ * @return the maximum reduce slots for this node
+ */
+ public int getMaxReduceSlots() {
return maxReduceTasks;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar 4 03:25:10 2011
@@ -54,6 +54,7 @@ interface TaskUmbilicalProtocol extends
* Version 14 changed the getTask method signature for HADOOP-4232
* Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
* Version 16 Added fatalError for child to communicate fatal errors to TT
+ * Version 16 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* */
public static final long versionID = 16L;
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java?rev=1076949&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java Fri Mar 4 03:25:10 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+public enum TaskType {
+ MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP
+}