You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [15/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Apr 16 14:44:35 2007
@@ -57,870 +57,869 @@
* @author Mike Cafarella
*******************************************************/
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol {
- static long RETIRE_JOB_INTERVAL;
- static long RETIRE_JOB_CHECK_INTERVAL;
- static float TASK_ALLOC_EPSILON;
- static float PAD_FRACTION;
- static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
-
- /**
- * The maximum no. of 'completed' (successful/failed/killed)
- * jobs kept in memory per-user.
- */
- static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
-
- /**
- * Used for formatting the id numbers
- */
- private static NumberFormat idFormat = NumberFormat.getInstance();
- static {
- idFormat.setMinimumIntegerDigits(4);
- idFormat.setGroupingUsed(false);
- }
-
- private int nextJobId = 1;
-
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
-
- private static JobTracker tracker = null;
- private static boolean runTracker = true;
-
- /**
- * Start the JobTracker with given configuration.
- *
- * The conf will be modified to reflect the actual ports on which
- * the JobTracker is up and running if the user passes the port as
- * <code>zero</code>.
- *
- * @param conf configuration for the JobTracker.
- * @throws IOException
- */
- public static void startTracker(Configuration conf) throws IOException {
- if (tracker != null)
- throw new IOException("JobTracker already running.");
- runTracker = true;
- while (runTracker) {
- try {
- tracker = new JobTracker(conf);
- break;
- } catch (VersionMismatch v) {
- // Can't recover from a version mismatch. Avoid the retry loop and re-throw
- throw v;
- } catch (IOException e) {
- LOG.warn("Error starting tracker: " +
- StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- if (runTracker) { tracker.offerService(); }
- }
-
- public static JobTracker getTracker() {
- return tracker;
- }
-
- public static void stopTracker() throws IOException {
- runTracker = false;
- if (tracker != null) {
- tracker.close();
- tracker = null;
+ static long RETIRE_JOB_INTERVAL;
+ static long RETIRE_JOB_CHECK_INTERVAL;
+ static float TASK_ALLOC_EPSILON;
+ static float PAD_FRACTION;
+ static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
+
+ /**
+ * The maximum no. of 'completed' (successful/failed/killed)
+ * jobs kept in memory per-user.
+ */
+ static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
+
+ /**
+ * Used for formatting the id numbers
+ */
+ private static NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setMinimumIntegerDigits(4);
+ idFormat.setGroupingUsed(false);
+ }
+
+ private int nextJobId = 1;
+
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
+
+ private static JobTracker tracker = null;
+ private static boolean runTracker = true;
+
+ /**
+ * Start the JobTracker with given configuration.
+ *
+ * The conf will be modified to reflect the actual ports on which
+ * the JobTracker is up and running if the user passes the port as
+ * <code>zero</code>.
+ *
+ * @param conf configuration for the JobTracker.
+ * @throws IOException
+ */
+ public static void startTracker(Configuration conf) throws IOException {
+ if (tracker != null)
+ throw new IOException("JobTracker already running.");
+ runTracker = true;
+ while (runTracker) {
+ try {
+ tracker = new JobTracker(conf);
+ break;
+ } catch (VersionMismatch v) {
+ // Can't recover from a version mismatch. Avoid the retry loop and re-throw
+ throw v;
+ } catch (IOException e) {
+ LOG.warn("Error starting tracker: " +
+ StringUtils.stringifyException(e));
}
- }
-
- public long getProtocolVersion(String protocol,
- long clientVersion) throws IOException {
- if (protocol.equals(InterTrackerProtocol.class.getName())) {
- return InterTrackerProtocol.versionID;
- } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
- return JobSubmissionProtocol.versionID;
- } else {
- throw new IOException("Unknown protocol to job tracker: " + protocol);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
}
}
- /**
- * A thread to timeout tasks that have been assigned to task trackers,
- * but that haven't reported back yet.
- * Note that I included a stop() method, even though there is no place
- * where JobTrackers are cleaned up.
- * @author Owen O'Malley
+ if (runTracker) { tracker.offerService(); }
+ }
+
+ public static JobTracker getTracker() {
+ return tracker;
+ }
+
+ public static void stopTracker() throws IOException {
+ runTracker = false;
+ if (tracker != null) {
+ tracker.close();
+ tracker = null;
+ }
+ }
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(InterTrackerProtocol.class.getName())) {
+ return InterTrackerProtocol.versionID;
+ } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
+ return JobSubmissionProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to job tracker: " + protocol);
+ }
+ }
+ /**
+ * A thread to timeout tasks that have been assigned to task trackers,
+ * but that haven't reported back yet.
+ * Note that I included a stop() method, even though there is no place
+ * where JobTrackers are cleaned up.
+ * @author Owen O'Malley
+ */
+ private class ExpireLaunchingTasks implements Runnable {
+ private volatile boolean shouldRun = true;
+ /**
+ * This is a map of the tasks that have been assigned to task trackers,
+ * but that have not yet been seen in a status report.
+ * map: task-id (String) -> time-assigned (Long)
*/
- private class ExpireLaunchingTasks implements Runnable {
- private volatile boolean shouldRun = true;
- /**
- * This is a map of the tasks that have been assigned to task trackers,
- * but that have not yet been seen in a status report.
- * map: task-id (String) -> time-assigned (Long)
- */
- private Map launchingTasks = new LinkedHashMap();
+ private Map launchingTasks = new LinkedHashMap();
- public void run() {
- while (shouldRun) {
- try {
- // Every 3 minutes check for any tasks that are overdue
- Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
- long now = System.currentTimeMillis();
- LOG.debug("Starting launching task sweep");
- synchronized (JobTracker.this) {
- synchronized (launchingTasks) {
- Iterator itr = launchingTasks.entrySet().iterator();
- while (itr.hasNext()) {
- Map.Entry pair = (Map.Entry) itr.next();
- String taskId = (String) pair.getKey();
- long age = now - ((Long) pair.getValue()).longValue();
- LOG.info(taskId + " is " + age + " ms debug.");
- if (age > TASKTRACKER_EXPIRY_INTERVAL) {
- LOG.info("Launching task " + taskId + " timed out.");
- TaskInProgress tip = null;
- tip = (TaskInProgress) taskidToTIPMap.get(taskId);
- if (tip != null) {
- JobInProgress job = tip.getJob();
- String trackerName = getAssignedTracker(taskId);
- TaskTrackerStatus trackerStatus =
- getTaskTracker(trackerName);
- // This might happen when the tasktracker has already
- // expired and this thread tries to call failedtask
- // again. expire tasktracker should have called failed
- // task!
- if (trackerStatus != null)
- job.failedTask(tip, taskId, "Error launching task",
- tip.isMapTask()? TaskStatus.Phase.MAP:
- TaskStatus.Phase.STARTING,
- trackerStatus.getHost(), trackerName,
- myMetrics);
- }
- itr.remove();
- } else {
- // the tasks are sorted by start time, so once we find
- // one that we want to keep, we are done for this cycle.
- break;
+ public void run() {
+ while (shouldRun) {
+ try {
+ // Every 3 minutes check for any tasks that are overdue
+ Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
+ long now = System.currentTimeMillis();
+ LOG.debug("Starting launching task sweep");
+ synchronized (JobTracker.this) {
+ synchronized (launchingTasks) {
+ Iterator itr = launchingTasks.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry pair = (Map.Entry) itr.next();
+ String taskId = (String) pair.getKey();
+ long age = now - ((Long) pair.getValue()).longValue();
+ LOG.info(taskId + " is " + age + " ms debug.");
+ if (age > TASKTRACKER_EXPIRY_INTERVAL) {
+ LOG.info("Launching task " + taskId + " timed out.");
+ TaskInProgress tip = null;
+ tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+ if (tip != null) {
+ JobInProgress job = tip.getJob();
+ String trackerName = getAssignedTracker(taskId);
+ TaskTrackerStatus trackerStatus =
+ getTaskTracker(trackerName);
+ // This might happen when the tasktracker has already
+ // expired and this thread tries to call failedtask
+ // again. expire tasktracker should have called failed
+ // task!
+ if (trackerStatus != null)
+ job.failedTask(tip, taskId, "Error launching task",
+ tip.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.STARTING,
+ trackerStatus.getHost(), trackerName,
+ myMetrics);
}
+ itr.remove();
+ } else {
+ // the tasks are sorted by start time, so once we find
+ // one that we want to keep, we are done for this cycle.
+ break;
}
}
}
- } catch (InterruptedException ie) {
- // all done
- return;
- } catch (Exception e) {
- LOG.error("Expire Launching Task Thread got exception: " +
- StringUtils.stringifyException(e));
}
+ } catch (InterruptedException ie) {
+ // all done
+ return;
+ } catch (Exception e) {
+ LOG.error("Expire Launching Task Thread got exception: " +
+ StringUtils.stringifyException(e));
}
}
+ }
- public void addNewTask(String taskName) {
- synchronized (launchingTasks) {
- launchingTasks.put(taskName,
- new Long(System.currentTimeMillis()));
- }
+ public void addNewTask(String taskName) {
+ synchronized (launchingTasks) {
+ launchingTasks.put(taskName,
+ new Long(System.currentTimeMillis()));
}
+ }
- public void removeTask(String taskName) {
- synchronized (launchingTasks) {
- launchingTasks.remove(taskName);
- }
+ public void removeTask(String taskName) {
+ synchronized (launchingTasks) {
+ launchingTasks.remove(taskName);
}
+ }
- public void stop() {
- shouldRun = false;
- }
+ public void stop() {
+ shouldRun = false;
}
+ }
- ///////////////////////////////////////////////////////
- // Used to expire TaskTrackers that have gone down
- ///////////////////////////////////////////////////////
- class ExpireTrackers implements Runnable {
- boolean shouldRun = true;
- public ExpireTrackers() {
- }
- /**
- * The run method lives for the life of the JobTracker, and removes TaskTrackers
- * that have not checked in for some time.
- */
- public void run() {
- while (shouldRun) {
- try {
- //
- // Thread runs periodically to check whether trackers should be expired.
- // The sleep interval must be no more than half the maximum expiry time
- // for a task tracker.
- //
- Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);
-
- //
- // Loop through all expired items in the queue
- //
- // Need to lock the JobTracker here since we are
- // manipulating it's data-structures via
- // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
- // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
- // Also need to lock JobTracker before locking 'taskTracker' &
- // 'trackerExpiryQueue' to prevent deadlock:
- // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)}
- synchronized (JobTracker.this) {
- synchronized (taskTrackers) {
- synchronized (trackerExpiryQueue) {
- long now = System.currentTimeMillis();
- TaskTrackerStatus leastRecent = null;
- while ((trackerExpiryQueue.size() > 0) &&
- ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
- (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
+ ///////////////////////////////////////////////////////
+ // Used to expire TaskTrackers that have gone down
+ ///////////////////////////////////////////////////////
+ class ExpireTrackers implements Runnable {
+ boolean shouldRun = true;
+ public ExpireTrackers() {
+ }
+ /**
+ * The run method lives for the life of the JobTracker, and removes TaskTrackers
+ * that have not checked in for some time.
+ */
+ public void run() {
+ while (shouldRun) {
+ try {
+ //
+ // Thread runs periodically to check whether trackers should be expired.
+ // The sleep interval must be no more than half the maximum expiry time
+ // for a task tracker.
+ //
+ Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);
+
+ //
+ // Loop through all expired items in the queue
+ //
+ // Need to lock the JobTracker here since we are
+ // manipulating it's data-structures via
+ // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
+ // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
+ // Also need to lock JobTracker before locking 'taskTracker' &
+ // 'trackerExpiryQueue' to prevent deadlock:
+ // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)}
+ synchronized (JobTracker.this) {
+ synchronized (taskTrackers) {
+ synchronized (trackerExpiryQueue) {
+ long now = System.currentTimeMillis();
+ TaskTrackerStatus leastRecent = null;
+ while ((trackerExpiryQueue.size() > 0) &&
+ ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
+ (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
- // Remove profile from head of queue
- trackerExpiryQueue.remove(leastRecent);
- String trackerName = leastRecent.getTrackerName();
+ // Remove profile from head of queue
+ trackerExpiryQueue.remove(leastRecent);
+ String trackerName = leastRecent.getTrackerName();
- // Figure out if last-seen time should be updated, or if tracker is dead
- TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
- // Items might leave the taskTracker set through other means; the
- // status stored in 'taskTrackers' might be null, which means the
- // tracker has already been destroyed.
- if (newProfile != null) {
- if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
- // Remove completely
- updateTaskTrackerStatus(trackerName, null);
- lostTaskTracker(leastRecent.getTrackerName(),
- leastRecent.getHost());
- } else {
- // Update time by inserting latest profile
- trackerExpiryQueue.add(newProfile);
- }
- }
- }
+ // Figure out if last-seen time should be updated, or if tracker is dead
+ TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
+ // Items might leave the taskTracker set through other means; the
+ // status stored in 'taskTrackers' might be null, which means the
+ // tracker has already been destroyed.
+ if (newProfile != null) {
+ if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+ // Remove completely
+ updateTaskTrackerStatus(trackerName, null);
+ lostTaskTracker(leastRecent.getTrackerName(),
+ leastRecent.getHost());
+ } else {
+ // Update time by inserting latest profile
+ trackerExpiryQueue.add(newProfile);
}
}
}
- } catch (Exception t) {
- LOG.error("Tracker Expiry Thread got exception: " +
- StringUtils.stringifyException(t));
}
}
+ }
+ } catch (Exception t) {
+ LOG.error("Tracker Expiry Thread got exception: " +
+ StringUtils.stringifyException(t));
}
+ }
+ }
- /**
- * Stop the tracker on next iteration
- */
- public void stopTracker() {
- shouldRun = false;
- }
+ /**
+ * Stop the tracker on next iteration
+ */
+ public void stopTracker() {
+ shouldRun = false;
}
+ }
- ///////////////////////////////////////////////////////
- // Used to remove old finished Jobs that have been around for too long
- ///////////////////////////////////////////////////////
- class RetireJobs implements Runnable {
- boolean shouldRun = true;
- public RetireJobs() {
- }
+ ///////////////////////////////////////////////////////
+ // Used to remove old finished Jobs that have been around for too long
+ ///////////////////////////////////////////////////////
+ class RetireJobs implements Runnable {
+ boolean shouldRun = true;
+ public RetireJobs() {
+ }
- /**
- * The run method lives for the life of the JobTracker,
- * and removes Jobs that are not still running, but which
- * finished a long time ago.
- */
- public void run() {
- while (shouldRun) {
- try {
- Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
- List<JobInProgress> retiredJobs = new ArrayList();
- long retireBefore = System.currentTimeMillis() -
- RETIRE_JOB_INTERVAL;
+ /**
+ * The run method lives for the life of the JobTracker,
+ * and removes Jobs that are not still running, but which
+ * finished a long time ago.
+ */
+ public void run() {
+ while (shouldRun) {
+ try {
+ Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
+ List<JobInProgress> retiredJobs = new ArrayList();
+ long retireBefore = System.currentTimeMillis() -
+ RETIRE_JOB_INTERVAL;
+ synchronized (jobsByArrival) {
+ for(JobInProgress job: jobsByArrival) {
+ if (job.getStatus().getRunState() != JobStatus.RUNNING &&
+ job.getStatus().getRunState() != JobStatus.PREP &&
+ (job.getFinishTime() < retireBefore)) {
+ retiredJobs.add(job);
+ }
+ }
+ }
+ if (!retiredJobs.isEmpty()) {
+ synchronized (JobTracker.this) {
+ synchronized (jobs) {
synchronized (jobsByArrival) {
- for(JobInProgress job: jobsByArrival) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING &&
- job.getStatus().getRunState() != JobStatus.PREP &&
- (job.getFinishTime() < retireBefore)) {
- retiredJobs.add(job);
- }
- }
- }
- if (!retiredJobs.isEmpty()) {
- synchronized (JobTracker.this) {
- synchronized (jobs) {
- synchronized (jobsByArrival) {
- synchronized (jobInitQueue) {
- for (JobInProgress job: retiredJobs) {
- removeJobTasks(job);
- jobs.remove(job.getProfile().getJobId());
- jobInitQueue.remove(job);
- jobsByArrival.remove(job);
- synchronized (userToJobsMap) {
- ArrayList<JobInProgress> userJobs =
- userToJobsMap.get(job.getProfile().getUser());
- synchronized (userJobs) {
- userJobs.remove(job);
- }
- }
- LOG.info("Retired job with id: '" +
- job.getProfile().getJobId() + "'");
- }
+ synchronized (jobInitQueue) {
+ for (JobInProgress job: retiredJobs) {
+ removeJobTasks(job);
+ jobs.remove(job.getProfile().getJobId());
+ jobInitQueue.remove(job);
+ jobsByArrival.remove(job);
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(job.getProfile().getUser());
+ synchronized (userJobs) {
+ userJobs.remove(job);
}
}
+ LOG.info("Retired job with id: '" +
+ job.getProfile().getJobId() + "'");
}
}
}
- } catch (InterruptedException t) {
- shouldRun = false;
- } catch (Throwable t) {
- LOG.error("Error in retiring job:\n" +
- StringUtils.stringifyException(t));
}
}
+ }
+ } catch (InterruptedException t) {
+ shouldRun = false;
+ } catch (Throwable t) {
+ LOG.error("Error in retiring job:\n" +
+ StringUtils.stringifyException(t));
}
+ }
}
-
- /////////////////////////////////////////////////////////////////
- // Used to init new jobs that have just been created
- /////////////////////////////////////////////////////////////////
- class JobInitThread implements Runnable {
- boolean shouldRun = true;
- public JobInitThread() {
- }
- public void run() {
- JobInProgress job;
- while (shouldRun) {
- job = null;
- try {
- synchronized (jobInitQueue) {
- while (jobInitQueue.isEmpty()) {
- jobInitQueue.wait();
- }
- job = jobInitQueue.remove(0);
- }
- job.initTasks();
- } catch (InterruptedException t) {
- shouldRun = false;
- } catch (Throwable t) {
- LOG.error("Job initialization failed:\n" +
- StringUtils.stringifyException(t));
- if (job != null) {
- job.kill();
- }
+ }
+
+ /////////////////////////////////////////////////////////////////
+ // Used to init new jobs that have just been created
+ /////////////////////////////////////////////////////////////////
+ class JobInitThread implements Runnable {
+ boolean shouldRun = true;
+ public JobInitThread() {
+ }
+ public void run() {
+ JobInProgress job;
+ while (shouldRun) {
+ job = null;
+ try {
+ synchronized (jobInitQueue) {
+ while (jobInitQueue.isEmpty()) {
+ jobInitQueue.wait();
}
+ job = jobInitQueue.remove(0);
+ }
+ job.initTasks();
+ } catch (InterruptedException t) {
+ shouldRun = false;
+ } catch (Throwable t) {
+ LOG.error("Job initialization failed:\n" +
+ StringUtils.stringifyException(t));
+ if (job != null) {
+ job.kill();
}
}
+ }
}
+ }
- static class JobTrackerMetrics implements Updater {
- private MetricsRecord metricsRecord = null;
- private int numMapTasksLaunched = 0;
- private int numMapTasksCompleted = 0;
- private int numReduceTasksLaunched = 0;
- private int numReduceTasksCompleted = 0;
- private int numJobsSubmitted = 0;
- private int numJobsCompleted = 0;
+ static class JobTrackerMetrics implements Updater {
+ private MetricsRecord metricsRecord = null;
+ private int numMapTasksLaunched = 0;
+ private int numMapTasksCompleted = 0;
+ private int numReduceTasksLaunched = 0;
+ private int numReduceTasksCompleted = 0;
+ private int numJobsSubmitted = 0;
+ private int numJobsCompleted = 0;
- JobTrackerMetrics() {
- MetricsContext context = MetricsUtil.getContext("mapred");
- metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
- context.registerUpdater(this);
- }
+ JobTrackerMetrics() {
+ MetricsContext context = MetricsUtil.getContext("mapred");
+ metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+ context.registerUpdater(this);
+ }
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
- metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
- metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
- metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
- metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
- metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+ metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+ metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+ metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+ metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
+ metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
- numMapTasksLaunched = 0;
- numMapTasksCompleted = 0;
- numReduceTasksLaunched = 0;
- numReduceTasksCompleted = 0;
- numJobsSubmitted = 0;
- numJobsCompleted = 0;
- }
- metricsRecord.update();
+ numMapTasksLaunched = 0;
+ numMapTasksCompleted = 0;
+ numReduceTasksLaunched = 0;
+ numReduceTasksCompleted = 0;
+ numJobsSubmitted = 0;
+ numJobsCompleted = 0;
+ }
+ metricsRecord.update();
- if (tracker != null) {
- for (JobInProgress jip : tracker.getRunningJobs()) {
- jip.updateMetrics();
- }
+ if (tracker != null) {
+ for (JobInProgress jip : tracker.getRunningJobs()) {
+ jip.updateMetrics();
}
}
+ }
- synchronized void launchMap() {
- ++numMapTasksLaunched;
- }
+ synchronized void launchMap() {
+ ++numMapTasksLaunched;
+ }
- synchronized void completeMap() {
- ++numMapTasksCompleted;
- }
+ synchronized void completeMap() {
+ ++numMapTasksCompleted;
+ }
- synchronized void launchReduce() {
- ++numReduceTasksLaunched;
- }
+ synchronized void launchReduce() {
+ ++numReduceTasksLaunched;
+ }
- synchronized void completeReduce() {
- ++numReduceTasksCompleted;
- }
+ synchronized void completeReduce() {
+ ++numReduceTasksCompleted;
+ }
- synchronized void submitJob() {
- ++numJobsSubmitted;
- }
+ synchronized void submitJob() {
+ ++numJobsSubmitted;
+ }
- synchronized void completeJob() {
- ++numJobsCompleted;
- }
+ synchronized void completeJob() {
+ ++numJobsCompleted;
}
+ }
- private JobTrackerMetrics myMetrics = null;
-
- /////////////////////////////////////////////////////////////////
- // The real JobTracker
- ////////////////////////////////////////////////////////////////
- int port;
- String localMachine;
- long startTime;
- int totalSubmissions = 0;
- Random r = new Random();
-
- private int maxCurrentTasks;
- private HostsFileReader hostsReader;
-
- //
- // Properties to maintain while running Jobs and Tasks:
- //
- // 1. Each Task is always contained in a single Job. A Job succeeds when all its
- // Tasks are complete.
- //
- // 2. Every running or successful Task is assigned to a Tracker. Idle Tasks are not.
- //
- // 3. When a Tracker fails, all of its assigned Tasks are marked as failures.
- //
- // 4. A Task might need to be reexecuted if it (or the machine it's hosted on) fails
- // before the Job is 100% complete. Sometimes an upstream Task can fail without
- // reexecution if all downstream Tasks that require its output have already obtained
- // the necessary files.
- //
-
- // All the known jobs. (jobid->JobInProgress)
- Map<String, JobInProgress> jobs = new TreeMap();
- List<JobInProgress> jobsByArrival = new ArrayList();
-
- // (user -> list of JobInProgress)
- TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
-
- // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
- Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
-
- // (taskid --> trackerID)
- TreeMap taskidToTrackerMap = new TreeMap();
-
- // (trackerID->TreeSet of taskids running at that tracker)
- TreeMap trackerToTaskMap = new TreeMap();
-
- // (trackerID -> TreeSet of completed taskids running at that tracker)
- TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
-
- // (trackerID --> last sent HeartBeatResponse)
- Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
- new TreeMap();
+ private JobTrackerMetrics myMetrics = null;
- //
- // Watch and expire TaskTracker objects using these structures.
- // We can map from Name->TaskTrackerStatus, or we can expire by time.
- //
- int totalMaps = 0;
- int totalReduces = 0;
- private TreeMap taskTrackers = new TreeMap();
- List<JobInProgress> jobInitQueue = new ArrayList();
- ExpireTrackers expireTrackers = new ExpireTrackers();
- Thread expireTrackersThread = null;
- RetireJobs retireJobs = new RetireJobs();
- Thread retireJobsThread = null;
- JobInitThread initJobs = new JobInitThread();
- Thread initJobsThread = null;
- ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
- Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
- "expireLaunchingTasks");
-
- /**
- * It might seem like a bug to maintain a TreeSet of status objects,
- * which can be updated at any time. But that's not what happens! We
- * only update status objects in the taskTrackers table. Status objects
- * are never updated once they enter the expiry queue. Instead, we wait
- * for them to expire and remove them from the expiry queue. If a status
- * object has been updated in the taskTracker table, the latest status is
- * reinserted. Otherwise, we assume the tracker has expired.
- */
- TreeSet trackerExpiryQueue = new TreeSet(new Comparator() {
- public int compare(Object o1, Object o2) {
- TaskTrackerStatus p1 = (TaskTrackerStatus) o1;
- TaskTrackerStatus p2 = (TaskTrackerStatus) o2;
- if (p1.getLastSeen() < p2.getLastSeen()) {
- return -1;
- } else if (p1.getLastSeen() > p2.getLastSeen()) {
- return 1;
- } else {
- return (p1.getTrackerName().compareTo(p2.getTrackerName()));
- }
+ /////////////////////////////////////////////////////////////////
+ // The real JobTracker
+ ////////////////////////////////////////////////////////////////
+ int port;
+ String localMachine;
+ long startTime;
+ int totalSubmissions = 0;
+ Random r = new Random();
+
+ private int maxCurrentTasks;
+ private HostsFileReader hostsReader;
+
+ //
+ // Properties to maintain while running Jobs and Tasks:
+ //
+ // 1. Each Task is always contained in a single Job. A Job succeeds when all its
+ // Tasks are complete.
+ //
+ // 2. Every running or successful Task is assigned to a Tracker. Idle Tasks are not.
+ //
+ // 3. When a Tracker fails, all of its assigned Tasks are marked as failures.
+ //
+ // 4. A Task might need to be reexecuted if it (or the machine it's hosted on) fails
+ // before the Job is 100% complete. Sometimes an upstream Task can fail without
+ // reexecution if all downstream Tasks that require its output have already obtained
+ // the necessary files.
+ //
+
+ // All the known jobs. (jobid->JobInProgress)
+ Map<String, JobInProgress> jobs = new TreeMap();
+ List<JobInProgress> jobsByArrival = new ArrayList();
+
+ // (user -> list of JobInProgress)
+ TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+
+ // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
+ Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
+
+ // (taskid --> trackerID)
+ TreeMap taskidToTrackerMap = new TreeMap();
+
+ // (trackerID->TreeSet of taskids running at that tracker)
+ TreeMap trackerToTaskMap = new TreeMap();
+
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
+ TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+
+ // (trackerID --> last sent HeartBeatResponse)
+ Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
+ new TreeMap();
+
+ //
+ // Watch and expire TaskTracker objects using these structures.
+ // We can map from Name->TaskTrackerStatus, or we can expire by time.
+ //
+ int totalMaps = 0;
+ int totalReduces = 0;
+ private TreeMap taskTrackers = new TreeMap();
+ List<JobInProgress> jobInitQueue = new ArrayList();
+ ExpireTrackers expireTrackers = new ExpireTrackers();
+ Thread expireTrackersThread = null;
+ RetireJobs retireJobs = new RetireJobs();
+ Thread retireJobsThread = null;
+ JobInitThread initJobs = new JobInitThread();
+ Thread initJobsThread = null;
+ ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+ Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
+ "expireLaunchingTasks");
+
+ /**
+ * It might seem like a bug to maintain a TreeSet of status objects,
+ * which can be updated at any time. But that's not what happens! We
+ * only update status objects in the taskTrackers table. Status objects
+ * are never updated once they enter the expiry queue. Instead, we wait
+ * for them to expire and remove them from the expiry queue. If a status
+ * object has been updated in the taskTracker table, the latest status is
+ * reinserted. Otherwise, we assume the tracker has expired.
+ */
+ TreeSet trackerExpiryQueue = new TreeSet(new Comparator() {
+ public int compare(Object o1, Object o2) {
+ TaskTrackerStatus p1 = (TaskTrackerStatus) o1;
+ TaskTrackerStatus p2 = (TaskTrackerStatus) o2;
+ if (p1.getLastSeen() < p2.getLastSeen()) {
+ return -1;
+ } else if (p1.getLastSeen() > p2.getLastSeen()) {
+ return 1;
+ } else {
+ return (p1.getTrackerName().compareTo(p2.getTrackerName()));
}
+ }
});
- // Used to provide an HTML view on Job, Task, and TaskTracker structures
- StatusHttpServer infoServer;
- String infoBindAddress;
- int infoPort;
-
- Server interTrackerServer;
-
- // Some jobs are stored in a local system directory. We can delete
- // the files when we're done with the job.
- static final String SUBDIR = "jobTracker";
- FileSystem fs;
- Path systemDir;
- private Configuration conf;
-
- /**
- * Start the JobTracker process, listen on the indicated port
- */
- JobTracker(Configuration conf) throws IOException {
- //
- // Grab some static constants
- //
- maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
- RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
- RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
- TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);
- PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
- 0.01f);
-
- // This is a directory of temporary submission files. We delete it
- // on startup, and can delete any files that we're done with
- this.conf = conf;
- JobConf jobConf = new JobConf(conf);
- this.systemDir = jobConf.getSystemDir();
- this.fs = FileSystem.get(conf);
- fs.delete(systemDir);
- if (!fs.mkdirs(systemDir)) {
- throw new IOException("Mkdirs failed to create " + systemDir.toString());
- }
-
- // Same with 'localDir' except it's always on the local disk.
- jobConf.deleteLocalFiles(SUBDIR);
-
- // Read the hosts/exclude files to restrict access to the jobtracker.
- this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
- conf.get("mapred.hosts.exclude", ""));
+ // Used to provide an HTML view on Job, Task, and TaskTracker structures
+ StatusHttpServer infoServer;
+ String infoBindAddress;
+ int infoPort;
+
+ Server interTrackerServer;
+
+ // Some jobs are stored in a local system directory. We can delete
+ // the files when we're done with the job.
+ static final String SUBDIR = "jobTracker";
+ FileSystem fs;
+ Path systemDir;
+ private Configuration conf;
+
+ /**
+ * Start the JobTracker process, listen on the indicated port
+ */
+ JobTracker(Configuration conf) throws IOException {
+ //
+ // Grab some static constants
+ //
+ maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+ RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
+ RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
+ TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);
+ PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
+ 0.01f);
+
+ // This is a directory of temporary submission files. We delete it
+ // on startup, and can delete any files that we're done with
+ this.conf = conf;
+ JobConf jobConf = new JobConf(conf);
+ this.systemDir = jobConf.getSystemDir();
+ this.fs = FileSystem.get(conf);
+ fs.delete(systemDir);
+ if (!fs.mkdirs(systemDir)) {
+ throw new IOException("Mkdirs failed to create " + systemDir.toString());
+ }
+
+ // Same with 'localDir' except it's always on the local disk.
+ jobConf.deleteLocalFiles(SUBDIR);
+
+ // Read the hosts/exclude files to restrict access to the jobtracker.
+ this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+ conf.get("mapred.hosts.exclude", ""));
- // Set ports, start RPC servers, etc.
- InetSocketAddress addr = getAddress(conf);
- this.localMachine = addr.getHostName();
- this.port = addr.getPort();
- this.interTrackerServer = RPC.getServer(this,addr.getHostName(), addr.getPort(), 10, false, conf);
- this.interTrackerServer.start();
- Properties p = System.getProperties();
- for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
- String key = (String) it.next();
- String val = (String) p.getProperty(key);
- LOG.info("Property '" + key + "' is " + val);
- }
-
- this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
- this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
- this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
- this.infoServer.start();
-
- this.startTime = System.currentTimeMillis();
-
- myMetrics = new JobTrackerMetrics();
- this.expireTrackersThread = new Thread(this.expireTrackers,
- "expireTrackers");
- this.expireTrackersThread.start();
- this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
- this.retireJobsThread.start();
- this.initJobsThread = new Thread(this.initJobs, "initJobs");
- this.initJobsThread.start();
- expireLaunchingTaskThread.start();
+ // Set ports, start RPC servers, etc.
+ InetSocketAddress addr = getAddress(conf);
+ this.localMachine = addr.getHostName();
+ this.port = addr.getPort();
+ this.interTrackerServer = RPC.getServer(this,addr.getHostName(), addr.getPort(), 10, false, conf);
+ this.interTrackerServer.start();
+ Properties p = System.getProperties();
+ for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
+ String key = (String) it.next();
+ String val = (String) p.getProperty(key);
+ LOG.info("Property '" + key + "' is " + val);
+ }
+
+ this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
+ this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
+ this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
+ this.infoServer.start();
+
+ this.startTime = System.currentTimeMillis();
+
+ myMetrics = new JobTrackerMetrics();
+ this.expireTrackersThread = new Thread(this.expireTrackers,
+ "expireTrackers");
+ this.expireTrackersThread.start();
+ this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
+ this.retireJobsThread.start();
+ this.initJobsThread = new Thread(this.initJobs, "initJobs");
+ this.initJobsThread.start();
+ expireLaunchingTaskThread.start();
- // The rpc/web-server ports can be ephemeral ports...
- // ... ensure we have the correct info
- this.port = interTrackerServer.getListenerAddress().getPort();
- this.conf.set("mapred.job.tracker", new String(this.localMachine + ":" + this.port));
- LOG.info("JobTracker up at: " + this.port);
- this.infoPort = this.infoServer.getPort();
- this.conf.set("mapred.job.tracker.info.port", this.infoPort);
- LOG.info("JobTracker webserver: " + this.infoServer.getPort());
- }
-
- public static InetSocketAddress getAddress(Configuration conf) {
- String jobTrackerStr =
- conf.get("mapred.job.tracker", "localhost:8012");
- int colon = jobTrackerStr.indexOf(":");
- if (colon < 0) {
- throw new RuntimeException("Bad mapred.job.tracker: "+jobTrackerStr);
- }
- String jobTrackerName = jobTrackerStr.substring(0, colon);
- int jobTrackerPort = Integer.parseInt(jobTrackerStr.substring(colon+1));
- return new InetSocketAddress(jobTrackerName, jobTrackerPort);
+ // The rpc/web-server ports can be ephemeral ports...
+ // ... ensure we have the correct info
+ this.port = interTrackerServer.getListenerAddress().getPort();
+ this.conf.set("mapred.job.tracker", new String(this.localMachine + ":" + this.port));
+ LOG.info("JobTracker up at: " + this.port);
+ this.infoPort = this.infoServer.getPort();
+ this.conf.set("mapred.job.tracker.info.port", this.infoPort);
+ LOG.info("JobTracker webserver: " + this.infoServer.getPort());
+ }
+
+ public static InetSocketAddress getAddress(Configuration conf) {
+ String jobTrackerStr =
+ conf.get("mapred.job.tracker", "localhost:8012");
+ int colon = jobTrackerStr.indexOf(":");
+ if (colon < 0) {
+ throw new RuntimeException("Bad mapred.job.tracker: "+jobTrackerStr);
+ }
+ String jobTrackerName = jobTrackerStr.substring(0, colon);
+ int jobTrackerPort = Integer.parseInt(jobTrackerStr.substring(colon+1));
+ return new InetSocketAddress(jobTrackerName, jobTrackerPort);
+ }
+
+
+ /**
+ * Run forever
+ */
+ public void offerService() {
+ try {
+ this.interTrackerServer.join();
+ } catch (InterruptedException ie) {
+ }
+ LOG.info("Stopped interTrackerServer");
+ }
+
+ void close() throws IOException {
+ if (this.infoServer != null) {
+ LOG.info("Stopping infoServer");
+ try {
+ this.infoServer.stop();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
}
-
-
- /**
- * Run forever
- */
- public void offerService() {
- try {
- this.interTrackerServer.join();
- } catch (InterruptedException ie) {
- }
- LOG.info("Stopped interTrackerServer");
+ if (this.interTrackerServer != null) {
+ LOG.info("Stopping interTrackerServer");
+ this.interTrackerServer.stop();
+ }
+ if (this.expireTrackers != null) {
+ LOG.info("Stopping expireTrackers");
+ this.expireTrackers.stopTracker();
+ try {
+ this.expireTrackersThread.interrupt();
+ this.expireTrackersThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
}
-
- void close() throws IOException {
- if (this.infoServer != null) {
- LOG.info("Stopping infoServer");
- try {
- this.infoServer.stop();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.interTrackerServer != null) {
- LOG.info("Stopping interTrackerServer");
- this.interTrackerServer.stop();
- }
- if (this.expireTrackers != null) {
- LOG.info("Stopping expireTrackers");
- this.expireTrackers.stopTracker();
- try {
- this.expireTrackersThread.interrupt();
- this.expireTrackersThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.retireJobs != null) {
- LOG.info("Stopping retirer");
- this.retireJobsThread.interrupt();
- try {
- this.retireJobsThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.initJobs != null) {
- LOG.info("Stopping initer");
- this.initJobsThread.interrupt();
- try {
- this.initJobsThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.expireLaunchingTaskThread != null) {
- LOG.info("Stopping expireLaunchingTasks");
- this.expireLaunchingTasks.stop();
- try {
- this.expireLaunchingTaskThread.interrupt();
- this.expireLaunchingTaskThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- LOG.info("stopped all jobtracker services");
- return;
+ if (this.retireJobs != null) {
+ LOG.info("Stopping retirer");
+ this.retireJobsThread.interrupt();
+ try {
+ this.retireJobsThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
}
-
- ///////////////////////////////////////////////////////
- // Maintain lookup tables; called by JobInProgress
- // and TaskInProgress
- ///////////////////////////////////////////////////////
- void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) {
- LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
-
- // taskid --> tracker
- taskidToTrackerMap.put(taskid, taskTracker);
-
- // tracker --> taskid
- TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
- if (taskset == null) {
- taskset = new TreeSet();
- trackerToTaskMap.put(taskTracker, taskset);
- }
- taskset.add(taskid);
-
- // taskid --> TIP
- taskidToTIPMap.put(taskid, tip);
+ if (this.initJobs != null) {
+ LOG.info("Stopping initer");
+ this.initJobsThread.interrupt();
+ try {
+ this.initJobsThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.expireLaunchingTaskThread != null) {
+ LOG.info("Stopping expireLaunchingTasks");
+ this.expireLaunchingTasks.stop();
+ try {
+ this.expireLaunchingTaskThread.interrupt();
+ this.expireLaunchingTaskThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
}
+ LOG.info("stopped all jobtracker services");
+ return;
+ }
- void removeTaskEntry(String taskid) {
- // taskid --> tracker
- String tracker = (String) taskidToTrackerMap.remove(taskid);
+ ///////////////////////////////////////////////////////
+ // Maintain lookup tables; called by JobInProgress
+ // and TaskInProgress
+ ///////////////////////////////////////////////////////
+ void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) {
+ LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
- // tracker --> taskid
- if (tracker != null) {
- TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
- if (trackerSet != null) {
- trackerSet.remove(taskid);
- }
- }
+ // taskid --> tracker
+ taskidToTrackerMap.put(taskid, taskTracker);
- // taskid --> TIP
- taskidToTIPMap.remove(taskid);
-
- LOG.debug("Removing task '" + taskid + "'");
- }
-
- /**
- * Mark a 'task' for removal later.
- * This function assumes that the JobTracker is locked on entry.
- *
- * @param taskTracker the tasktracker at which the 'task' was running
- * @param taskid completed (success/failure/killed) task
- */
- void markCompletedTaskAttempt(String taskTracker, String taskid) {
- // tracker --> taskid
- TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
- if (taskset == null) {
- taskset = new TreeSet();
- trackerToMarkedTasksMap.put(taskTracker, taskset);
- }
- taskset.add(taskid);
-
- LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+ // tracker --> taskid
+ TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToTaskMap.put(taskTracker, taskset);
}
+ taskset.add(taskid);
- /**
- * Mark all 'non-running' jobs of the job for pruning.
- * This function assumes that the JobTracker is locked on entry.
- *
- * @param job the completed job
- */
- void markCompletedJob(JobInProgress job) {
- for (TaskInProgress tip : job.getMapTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- markCompletedTaskAttempt(taskStatus.getTaskTracker(),
- taskStatus.getTaskId());
- }
- }
- }
- for (TaskInProgress tip : job.getReduceTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- markCompletedTaskAttempt(taskStatus.getTaskTracker(),
- taskStatus.getTaskId());
- }
- }
- }
- }
+ // taskid --> TIP
+ taskidToTIPMap.put(taskid, tip);
+ }
- /**
- * Remove all 'marked' tasks running on a given {@link TaskTracker}
- * from the {@link JobTracker}'s data-structures.
- * This function assumes that the JobTracker is locked on entry.
- *
- * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
- */
- private void removeMarkedTasks(String taskTracker) {
- // Purge all the 'marked' tasks which were running at taskTracker
- TreeSet<String> markedTaskSet =
- (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
- if (markedTaskSet != null) {
- for (String taskid : markedTaskSet) {
- removeTaskEntry(taskid);
- LOG.info("Removed completed task '" + taskid + "' from '" +
- taskTracker + "'");
- }
- // Clear
- trackerToMarkedTasksMap.remove(taskTracker);
+ void removeTaskEntry(String taskid) {
+ // taskid --> tracker
+ String tracker = (String) taskidToTrackerMap.remove(taskid);
+
+ // tracker --> taskid
+ if (tracker != null) {
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+ if (trackerSet != null) {
+ trackerSet.remove(taskid);
}
}
+
+ // taskid --> TIP
+ taskidToTIPMap.remove(taskid);
+
+ LOG.debug("Removing task '" + taskid + "'");
+ }
- /**
- * Call {@link #removeTaskEntry(String)} for each of the
- * job's tasks.
- * When the JobTracker is retiring the long-completed
- * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
- * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs
- * has been reached, we can afford to nuke all it's tasks; a little
- * unsafe, but practically feasible.
- *
- * @param job the job about to be 'retired'
- */
- synchronized private void removeJobTasks(JobInProgress job) {
- for (TaskInProgress tip : job.getMapTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- removeTaskEntry(taskStatus.getTaskId());
- }
- }
- for (TaskInProgress tip : job.getReduceTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- removeTaskEntry(taskStatus.getTaskId());
- }
- }
+ /**
+ * Mark a 'task' for removal later.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker the tasktracker at which the 'task' was running
+ * @param taskid completed (success/failure/killed) task
+ */
+ void markCompletedTaskAttempt(String taskTracker, String taskid) {
+ // tracker --> taskid
+ TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToMarkedTasksMap.put(taskTracker, taskset);
}
-
- /**
- * Safe clean-up all data structures at the end of the
- * job (success/failure/killed).
- * Here we also ensure that for a given user we maintain
- * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs
- * on the JobTracker.
- *
- * @param job completed job.
- */
- synchronized void finalizeJob(JobInProgress job) {
- // Mark the 'non-running' tasks for pruning
- markCompletedJob(job);
+ taskset.add(taskid);
- // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
- // in memory; information about the purged jobs is available via
- // JobHistory.
- synchronized (jobs) {
- synchronized (jobsByArrival) {
- synchronized (jobInitQueue) {
- String jobUser = job.getProfile().getUser();
- synchronized (userToJobsMap) {
- ArrayList<JobInProgress> userJobs =
- userToJobsMap.get(jobUser);
- synchronized (userJobs) {
- while (userJobs.size() >
- MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
- JobInProgress rjob = userJobs.get(0);
+ LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+ }
+
+ /**
+ * Mark all 'non-running' jobs of the job for pruning.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param job the completed job
+ */
+ void markCompletedJob(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove all 'marked' tasks running on a given {@link TaskTracker}
+ * from the {@link JobTracker}'s data-structures.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
+ */
+ private void removeMarkedTasks(String taskTracker) {
+ // Purge all the 'marked' tasks which were running at taskTracker
+ TreeSet<String> markedTaskSet =
+ (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+ if (markedTaskSet != null) {
+ for (String taskid : markedTaskSet) {
+ removeTaskEntry(taskid);
+ LOG.info("Removed completed task '" + taskid + "' from '" +
+ taskTracker + "'");
+ }
+ // Clear
+ trackerToMarkedTasksMap.remove(taskTracker);
+ }
+ }
+
+ /**
+ * Call {@link #removeTaskEntry(String)} for each of the
+ * job's tasks.
+ * When the JobTracker is retiring the long-completed
+ * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
+ * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs
+ * has been reached, we can afford to nuke all it's tasks; a little
+ * unsafe, but practically feasible.
+ *
+ * @param job the job about to be 'retired'
+ */
+ synchronized private void removeJobTasks(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ }
+
+ /**
+ * Safe clean-up all data structures at the end of the
+ * job (success/failure/killed).
+ * Here we also ensure that for a given user we maintain
+ * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs
+ * on the JobTracker.
+ *
+ * @param job completed job.
+ */
+ synchronized void finalizeJob(JobInProgress job) {
+ // Mark the 'non-running' tasks for pruning
+ markCompletedJob(job);
+
+ // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
+ // in memory; information about the purged jobs is available via
+ // JobHistory.
+ synchronized (jobs) {
+ synchronized (jobsByArrival) {
+ synchronized (jobInitQueue) {
+ String jobUser = job.getProfile().getUser();
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(jobUser);
+ synchronized (userJobs) {
+ while (userJobs.size() >
+ MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ JobInProgress rjob = userJobs.get(0);
- // Do not delete 'current'
- // finished job just yet.
- if (rjob == job) {
- break;
- }
+ // Do not delete 'current'
+ // finished job just yet.
+ if (rjob == job) {
+ break;
+ }
- // Cleanup all datastructures
- int rjobRunState =
- rjob.getStatus().getRunState();
- if (rjobRunState == JobStatus.SUCCEEDED ||
- rjobRunState == JobStatus.FAILED) {
- // Ok, this call to removeTaskEntries
- // is dangerous is some very very obscure
- // cases; e.g. when rjob completed, hit
- // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
- // limit and yet some task (taskid)
- // wasn't complete!
- removeJobTasks(rjob);
+ // Cleanup all datastructures
+ int rjobRunState =
+ rjob.getStatus().getRunState();
+ if (rjobRunState == JobStatus.SUCCEEDED ||
+ rjobRunState == JobStatus.FAILED) {
+ // Ok, this call to removeTaskEntries
+ // is dangerous is some very very obscure
+ // cases; e.g. when rjob completed, hit
+ // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+ // limit and yet some task (taskid)
+ // wasn't complete!
+ removeJobTasks(rjob);
- userJobs.remove(0);
- jobs.remove(rjob.getProfile().getJobId());
- jobInitQueue.remove(rjob);
- jobsByArrival.remove(rjob);
+ userJobs.remove(0);
+ jobs.remove(rjob.getProfile().getJobId());
+ jobInitQueue.remove(rjob);
+ jobsByArrival.remove(rjob);
- LOG.info("Retired job with id: '" +
- rjob.getProfile().getJobId() + "'");
- } else {
- // Do not remove jobs that aren't complete.
- // Stop here, and let the next pass take
- // care of purging jobs.
- break;
- }
+ LOG.info("Retired job with id: '" +
+ rjob.getProfile().getJobId() + "'");
+ } else {
+ // Do not remove jobs that aren't complete.
+ // Stop here, and let the next pass take
+ // care of purging jobs.
+ break;
}
}
}
@@ -928,773 +927,774 @@
}
}
}
+ }
- ///////////////////////////////////////////////////////
- // Accessors for objects that want info on jobs, tasks,
- // trackers, etc.
- ///////////////////////////////////////////////////////
- public int getTotalSubmissions() {
- return totalSubmissions;
- }
- public String getJobTrackerMachine() {
- return localMachine;
- }
- public int getTrackerPort() {
- return port;
- }
- public int getInfoPort() {
- return infoPort;
- }
- public long getStartTime() {
- return startTime;
- }
- public Vector runningJobs() {
- Vector v = new Vector();
- for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
- JobInProgress jip = (JobInProgress) it.next();
- JobStatus status = jip.getStatus();
- if (status.getRunState() == JobStatus.RUNNING) {
- v.add(jip);
- }
- }
- return v;
- }
- /**
- * Version that is called from a timer thread, and therefore needs to be
- * careful to synchronize.
- */
- public synchronized List<JobInProgress> getRunningJobs() {
- synchronized (jobs) {
- return (List<JobInProgress>) runningJobs();
- }
- }
- public Vector failedJobs() {
- Vector v = new Vector();
- for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
- JobInProgress jip = (JobInProgress) it.next();
- JobStatus status = jip.getStatus();
- if (status.getRunState() == JobStatus.FAILED) {
- v.add(jip);
- }
- }
- return v;
- }
- public Vector completedJobs() {
- Vector v = new Vector();
- for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
- JobInProgress jip = (JobInProgress) it.next();
- JobStatus status = jip.getStatus();
- if (status.getRunState() == JobStatus.SUCCEEDED) {
- v.add(jip);
- }
- }
- return v;
- }
- public Collection taskTrackers() {
- synchronized (taskTrackers) {
- return taskTrackers.values();
- }
- }
- public TaskTrackerStatus getTaskTracker(String trackerID) {
- synchronized (taskTrackers) {
- return (TaskTrackerStatus) taskTrackers.get(trackerID);
- }
- }
-
- ////////////////////////////////////////////////////
- // InterTrackerProtocol
- ////////////////////////////////////////////////////
-
- /**
- * The periodic heartbeat mechanism between the {@link TaskTracker} and
- * the {@link JobTracker}.
- *
- * The {@link JobTracker} processes the status information sent by the
- * {@link TaskTracker} and responds with instructions to start/stop
- * tasks or jobs, and also 'reset' instructions during contingencies.
- */
- public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
- boolean initialContact, boolean acceptNewTasks, short responseId)
+ ///////////////////////////////////////////////////////
+ // Accessors for objects that want info on jobs, tasks,
+ // trackers, etc.
+ ///////////////////////////////////////////////////////
+ public int getTotalSubmissions() {
+ return totalSubmissions;
+ }
+ public String getJobTrackerMachine() {
+ return localMachine;
+ }
+ public int getTrackerPort() {
+ return port;
+ }
+ public int getInfoPort() {
+ return infoPort;
+ }
+ public long getStartTime() {
+ return startTime;
+ }
+ public Vector runningJobs() {
+ Vector v = new Vector();
+ for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
+ JobInProgress jip = (JobInProgress) it.next();
+ JobStatus status = jip.getStatus();
+ if (status.getRunState() == JobStatus.RUNNING) {
+ v.add(jip);
+ }
+ }
+ return v;
+ }
+ /**
+ * Version that is called from a timer thread, and therefore needs to be
+ * careful to synchronize.
+ */
+ public synchronized List<JobInProgress> getRunningJobs() {
+ synchronized (jobs) {
+ return (List<JobInProgress>) runningJobs();
+ }
+ }
+ public Vector failedJobs() {
+ Vector v = new Vector();
+ for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
+ JobInProgress jip = (JobInProgress) it.next();
+ JobStatus status = jip.getStatus();
+ if (status.getRunState() == JobStatus.FAILED) {
+ v.add(jip);
+ }
+ }
+ return v;
+ }
+ public Vector completedJobs() {
+ Vector v = new Vector();
+ for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
+ JobInProgress jip = (JobInProgress) it.next();
+ JobStatus status = jip.getStatus();
+ if (status.getRunState() == JobStatus.SUCCEEDED) {
+ v.add(jip);
+ }
+ }
+ return v;
+ }
+ public Collection taskTrackers() {
+ synchronized (taskTrackers) {
+ return taskTrackers.values();
+ }
+ }
+ public TaskTrackerStatus getTaskTracker(String trackerID) {
+ synchronized (taskTrackers) {
+ return (TaskTrackerStatus) taskTrackers.get(trackerID);
+ }
+ }
+
+ ////////////////////////////////////////////////////
+ // InterTrackerProtocol
+ ////////////////////////////////////////////////////
+
+ /**
+ * The periodic heartbeat mechanism between the {@link TaskTracker} and
+ * the {@link JobTracker}.
+ *
+ * The {@link JobTracker} processes the status information sent by the
+ * {@link TaskTracker} and responds with instructions to start/stop
+ * tasks or jobs, and also 'reset' instructions during contingencies.
+ */
+ public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
+ boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
- LOG.debug("Got heartbeat from: " + status.getTrackerName() +
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
- // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
- if (!acceptTaskTracker(status)) {
- throw new DisallowedTaskTrackerException(status);
- }
-
- // First check if the last heartbeat response got through
- String trackerName = status.getTrackerName();
- HeartbeatResponse prevHeartbeatResponse =
- trackerToHeartbeatResponseMap.get(trackerName);
-
- if (initialContact != true) {
- // If this isn't the 'initial contact' from the tasktracker,
- // there is something seriously wrong if the JobTracker has
- // no record of the 'previous heartbeat'; if so, ask the
- // tasktracker to re-initialize itself.
- if (prevHeartbeatResponse == null) {
- LOG.warn("Serious problem, cannot find record of 'previous' " +
- "heartbeat for '" + trackerName +
- "'; reinitializing the tasktracker");
- return new HeartbeatResponse(responseId,
- new TaskTrackerAction[] {new ReinitTrackerAction()});
+ // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
+ if (!acceptTaskTracker(status)) {
+ throw new DisallowedTaskTrackerException(status);
+ }
+
+ // First check if the last heartbeat response got through
+ String trackerName = status.getTrackerName();
+ HeartbeatResponse prevHeartbeatResponse =
+ trackerToHeartbeatResponseMap.get(trackerName);
+
+ if (initialContact != true) {
+ // If this isn't the 'initial contact' from the tasktracker,
+ // there is something seriously wrong if the JobTracker has
+ // no record of the 'previous heartbeat'; if so, ask the
+ // tasktracker to re-initialize itself.
+ if (prevHeartbeatResponse == null) {
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
+ "heartbeat for '" + trackerName +
+ "'; reinitializing the tasktracker");
+ return new HeartbeatResponse(responseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
- }
+ }
- // It is completely safe to ignore a 'duplicate' from a tracker
- // since we are guaranteed that the tracker sends the same
- // 'heartbeat' when rpcs are lost.
- // {@see TaskTracker.transmitHeartbeat()}
- if (prevHeartbeatResponse.getResponseId() != responseId) {
- LOG.info("Ignoring 'duplicate' heartbeat from '" +
- trackerName + "'");
- return prevHeartbeatResponse;
- }
- }
+ // It is completely safe to ignore a 'duplicate' from a tracker
+ // since we are guaranteed that the tracker sends the same
+ // 'heartbeat' when rpcs are lost.
+ // {@see TaskTracker.transmitHeartbeat()}
+ if (prevHeartbeatResponse.getResponseId() != responseId) {
+ LOG.info("Ignoring 'duplicate' heartbeat from '" +
+ trackerName + "'");
+ return prevHeartbeatResponse;
+ }
+ }
- // Process this heartbeat
- short newResponseId = (short)(responseId + 1);
- if (!processHeartbeat(status, initialContact)) {
- if (prevHeartbeatResponse != null) {
- trackerToHeartbeatResponseMap.remove(trackerName);
- }
+ // Process this heartbeat
+ short newResponseId = (short)(responseId + 1);
+ if (!processHeartbeat(status, initialContact)) {
+ if (prevHeartbeatResponse != null) {
+ trackerToHeartbeatResponseMap.remove(trackerName);
+ }
- return new HeartbeatResponse(newResponseId,
- new TaskTrackerAction[] {new ReinitTrackerAction()});
- }
+ return new HeartbeatResponse(newResponseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
+ }
- // Initialize the response to be sent for the heartbeat
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
- List<TaskTrackerAction> actions = new ArrayList();
+ // Initialize the response to be sent for the heartbeat
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+ List<TaskTrackerAction> actions = new ArrayList();
- // Check for new tasks to be executed on the tasktracker
- if (acceptNewTasks) {
- Task task = getNewTaskForTaskTracker(trackerName);
- if (task != null) {
- LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId());
- actions.add(new LaunchTaskAction(task));
- }
- }
+ // Check for new tasks to be executed on the tasktracker
+ if (acceptNewTasks) {
+ Task task = getNewTaskForTaskTracker(trackerName);
+ if (task != null) {
+ LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId());
+ actions.add(new LaunchTaskAction(task));
+ }
+ }
- // Check for tasks to be killed
- List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
- if (killTasksList != null) {
- actions.addAll(killTasksList);
- }
+ // Check for tasks to be killed
+ List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
+ if (killTasksList != null) {
+ actions.addAll(killTasksList);
+ }
- response.setActions(
- actions.toArray(new TaskTrackerAction[actions.size()]));
+ response.setActions(
+ actions.toArray(new TaskTrackerAction[actions.size()]));
- // Update the trackerToHeartbeatResponseMap
- trackerToHeartbeatResponseMap.put(trackerName, response);
+ // Update the trackerToHeartbeatResponseMap
+ trackerToHeartbeatResponseMap.put(trackerName, response);
- // Done processing the hearbeat, now remove 'marked' tasks
- removeMarkedTasks(trackerName);
+ // Done processing the hearbeat, now remove 'marked' tasks
+ removeMarkedTasks(trackerName);
- return response;
- }
+ return response;
+ }
- /**
- * Return if the specified tasktracker is in the hosts list,
- * if one was configured. If none was configured, then this
- * returns true.
- */
- private boolean inHostsList(TaskTrackerStatus status) {
- Set<String> hostsList = hostsReader.getHosts();
- return (hostsList.isEmpty() || hostsList.contains(status.getHost()));
- }
-
- /**
- * Return if the specified tasktracker is in the exclude list.
- */
- private boolean inExcludedHostsList(TaskTrackerStatus status) {
- Set<String> excludeList = hostsReader.getExcludedHosts();
- return excludeList.contains(status.getHost());
- }
+ /**
+ * Return if the specified tasktracker is in the hosts list,
+ * if one was configured. If none was configured, then this
+ * returns true.
+ */
+ private boolean inHostsList(TaskTrackerStatus status) {
+ Set<String> hostsList = hostsReader.getHosts();
+ return (hostsList.isEmpty() || hostsList.contains(status.getHost()));
+ }
+
+ /**
+ * Return if the specified tasktracker is in the exclude list.
+ */
+ private boolean inExcludedHostsList(TaskTrackerStatus status) {
+ Set<String> excludeList = hostsReader.getExcludedHosts();
+ return excludeList.contains(status.getHost());
+ }
+
+ /**
+ * Returns true if the tasktracker is in the hosts list and
+ * not in the exclude list.
+ */
+ private boolean acceptTaskTracker(TaskTrackerStatus status) {
+ return (inHostsList(status) && !inExcludedHostsList(status));
+ }
+
+ /**
+ * Update the last recorded status for the given task tracker.
+ * It assumes that the taskTrackers are locked on entry.
+ * @author Owen O'Malley
+ * @param trackerName The name of the tracker
+ * @param status The new status for the task tracker
+ * @return Was an old status found?
+ */
+ private boolean updateTaskTrackerStatus(String trackerName,
+ TaskTrackerStatus status) {
+ TaskTrackerStatus oldStatus =
+ (TaskTrackerStatus) taskTrackers.get(trackerName);
+ if (oldStatus != null) {
+ totalMaps -= oldStatus.countMapTasks();
+ totalReduces -= oldStatus.countReduceTasks();
+ if (status == null) {
+ taskTrackers.remove(trackerName);
+ }
+ }
+ if (status != null) {
+ totalMaps += status.countMapTasks();
+ totalReduces += status.countReduceTasks();
+ taskTrackers.put(trackerName, status);
+ }
+ return oldStatus != null;
+ }
+
+ /**
+ * Process incoming heartbeat messages from the task trackers.
+ */
+ private synchronized boolean processHeartbeat(
+ TaskTrackerStatus trackerStatus, boolean initialContact) {
+ String trackerName = trackerStatus.getTrackerName();
+ trackerStatus.setLastSeen(System.currentTimeMillis());
+
+ synchronized (taskTrackers) {
+ synchronized (trackerExpiryQueue) {
+ boolean seenBefore = updateTaskTrackerStatus(trackerName,
+ trackerStatus);
+ if (initialContact) {
+ // If it's first contact, then clear out
+ // any state hanging around
+ if (seenBefore) {
+ lostTaskTracker(trackerName, trackerStatus.getHost());
+ }
+ } else {
+ // If not first contact, there should be some record of the tracker
+ if (!seenBefore) {
+ LOG.warn("Status from unknown Tracker : " + trackerName);
+ taskTrackers.remove(trackerName);
+ return false;
+ }
+ }
- /**
- * Returns true if the tasktracker is in the hosts list and
- * not in the exclude list.
- */
- private boolean acceptTaskTracker(TaskTrackerStatus status) {
- return (inHostsList(status) && !inExcludedHostsList(status));
- }
-
- /**
- * Update the last recorded status for the given task tracker.
- * It assumes that the taskTrackers are locked on entry.
- * @author Owen O'Malley
- * @param trackerName The name of the tracker
- * @param status The new status for the task tracker
- * @return Was an old status found?
- */
- private boolean updateTaskTrackerStatus(String trackerName,
- TaskTrackerStatus status) {
- TaskTrackerStatus oldStatus =
- (TaskTrackerStatus) taskTrackers.get(trackerName);
- if (oldStatus != null) {
- totalMaps -= oldStatus.countMapTasks();
- totalReduces -= oldStatus.countReduceTasks();
- if (status == null) {
- taskTrackers.remove(trackerName);
+ if (initialContact) {
+ trackerExpiryQueue.add(trackerStatus);
}
}
- if (status != null) {
- totalMaps += status.countMapTasks();
- totalReduces += status.countReduceTasks();
- taskTrackers.put(trackerName, status);
- }
- return oldStatus != null;
}
-
- /**
- * Process incoming heartbeat messages from the task trackers.
- */
- private synchronized boolean processHeartbeat(
- TaskTrackerStatus trackerStatus, boolean initialContact) {
- String trackerName = trackerStatus.getTrackerName();
- trackerStatus.setLastSeen(System.currentTimeMillis());
-
- synchronized (taskTrackers) {
- synchronized (trackerExpiryQueue) {
- boolean seenBefore = updateTaskTrackerStatus(trackerName,
- trackerStatus);
- if (initialContact) {
- // If it's first contact, then clear out
- // any state hanging around
- if (seenBefore) {
- lostTaskTracker(trackerName, trackerStatus.getHost());
- }
- } else {
- // If not first contact, there should be some record of the tracker
- if (!seenBefore) {
- LOG.warn("Status from unknown Tracker : " + trackerName);
- taskTrackers.remove(trackerName);
- return false;
- }
- }
- if (initialContact) {
- trackerExpiryQueue.add(trackerStatus);
- }
- }
- }
+ updateTaskStatuses(trackerStatus);
- updateTaskStatuses(trackerStatus);
+ return true;
+ }
- return true;
- }
-
- /**
- * Returns a task we'd like the TaskTracker to execute right now.
- *
- * Eventually this function should compute load on the various TaskTrackers,
- * and incorporate knowledge of DFS file placement. But for right now, it
- * just grabs a single item out of the pending task list and hands it back.
- */
- private synchronized Task getNewTaskForTaskTracker(String taskTracker
- ) throws IOException {
- //
- // Compute average map and reduce task numbers across pool
- //
- int remainingReduceLoad = 0;
- int remainingMapLoad = 0;
- int numTaskTrackers;
- TaskTrackerStatus tts;
+ /**
+ * Returns a task we'd like the TaskTracker to execute right now.
+ *
+ * Eventually this function should compute load on the various TaskTrackers,
+ * and incorporate knowledge of DFS file placement. But for right now, it
+ * just grabs a single item out of the pending task list and hands it back.
+ */
+ private synchronized Task getNewTaskForTaskTracker(String taskTracker
+ ) throws IOException {
+ //
+ // Compute average map and reduce task numbers across pool
+ //
+ int remainingReduceLoad = 0;
+ int remainingMapLoad = 0;
+ int numTaskTrackers;
+ TaskTrackerStatus tts;
- synchronized (taskTrackers) {
- numTaskTrackers = taskTrackers.size();
- tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
- }
- if (tts == null) {
- LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
- return null;
- }
- int totalCapacity = numTaskTrackers * maxCurrentTasks;
-
- synchronized(jobsByArrival){
- for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
- JobInProgress job = (JobInProgress) it.next();
- if (job.getStatus().getRunState() == JobStatus.RUNNING) {
- int totalMapTasks = job.desiredMaps();
- int totalReduceTasks = job.desiredReduces();
- remainingMapLoad += (totalMapTasks - job.finishedMaps());
- remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
- }
- }
- }
-
- // find out the maximum number of maps or reduces that we are willing
- // to run on any node.
- int maxMapLoad = 0;
- int maxReduceLoad = 0;
- if (numTaskTrackers > 0) {
- maxMapLoad = Math.min(maxCurrentTasks,
- (int) Math.ceil((double) remainingMapLoad /
- numTaskTrackers));
- maxReduceLoad = Math.min(maxCurrentTasks,
- (int) Math.ceil((double) remainingReduceLoad
- / numTaskTrackers));
- }
+ synchronized (taskTrackers) {
+ numTaskTrackers = taskTrackers.size();
+ tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
+ }
+ if (tts == null) {
+ LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
+ return null;
+ }
+ int totalCapacity = numTaskTrackers * maxCurrentTasks;
+
+ synchronized(jobsByArrival){
+ for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
+ JobInProgress job = (JobInProgress) it.next();
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ int totalMapTasks = job.desiredMaps();
+ int totalReduceTasks = job.desiredReduces();
+ remainingMapLoad += (totalMapTasks - job.finishedMaps());
+ remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+ }
+ }
+ }
+
+ // find out the maximum number of maps or reduces that we are willing
+ // to run on any node.
+ int maxMapLoad = 0;
+ int maxReduceLoad = 0;
[... 897 lines stripped ...]