You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [18/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Nov 28 20:26:01 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
@@ -64,17 +65,20 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
-import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
-import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -96,9 +100,8 @@
import org.apache.hadoop.util.Service;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
/*******************************************************
* JobTracker is the central location for submitting and
@@ -107,12 +110,11 @@
*******************************************************/
public class JobTracker extends Service
implements MRConstants, InterTrackerProtocol,
- JobSubmissionProtocol, TaskTrackerManager,
- RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
+ ClientProtocol, TaskTrackerManager,
+ RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig {
static{
- Configuration.addDefaultResource("mapred-default.xml");
- Configuration.addDefaultResource("mapred-site.xml");
+ ConfigUtil.loadResources();
}
private long tasktrackerExpiryInterval;
@@ -129,17 +131,26 @@
// The maximum number of blacklists for a tracker after which the
// tracker could be blacklisted across all jobs
private int MAX_BLACKLISTS_PER_TRACKER = 4;
+
// Approximate number of heartbeats that could arrive JobTracker
// in a second
- private int NUM_HEARTBEATS_IN_SECOND = 100;
+ private int NUM_HEARTBEATS_IN_SECOND;
+ private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+ private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+
+ // Scaling factor for heartbeats, used for testing only
+ private float HEARTBEATS_SCALING_FACTOR;
+ private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+ private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+
public static enum State { INITIALIZING, RUNNING }
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private DNSToSwitchMapping dnsToSwitchMapping;
- private NetworkTopology clusterMap = new NetworkTopology();
+ NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
- private TaskScheduler taskScheduler;
+ TaskScheduler taskScheduler;
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
@@ -155,6 +166,8 @@
static final Clock DEFAULT_CLOCK = new Clock();
+ private JobHistory jobHistory;
+
/**
* A client tried to submit a job before the Job Tracker was ready.
*/
@@ -181,6 +194,11 @@
}
/**
+ * Return the JT's job history handle.
+ * @return the jobhistory handle
+ */
+ JobHistory getJobHistory() { return jobHistory; }
+ /**
* Start the JobTracker with given configuration.
*
* The conf will be modified to reflect the actual ports on which
@@ -245,8 +263,8 @@
long clientVersion) throws IOException {
if (protocol.equals(InterTrackerProtocol.class.getName())) {
return InterTrackerProtocol.versionID;
- } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
- return JobSubmissionProtocol.versionID;
+ } else if (protocol.equals(ClientProtocol.class.getName())){
+ return ClientProtocol.versionID;
} else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
return RefreshAuthorizationPolicyProtocol.versionID;
} else if (protocol.equals(AdminOperationsProtocol.class.getName())){
@@ -408,14 +426,7 @@
if ((now - newProfile.getLastSeen()) >
tasktrackerExpiryInterval) {
// Remove completely after marking the tasks as 'KILLED'
- lostTaskTracker(current);
- // tracker is lost, and if it is blacklisted, remove
- // it from the count of blacklisted trackers in the cluster
- if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers -= 1;
- }
- updateTaskTrackerStatus(trackerName, null);
- statistics.taskTrackerRemoved(trackerName);
+ removeTracker(current);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -430,7 +441,20 @@
}
}
- synchronized void retireJob(JobID jobid, String historyFile) {
+ private void removeTracker(TaskTracker tracker) {
+ lostTaskTracker(tracker);
+ String trackerName = tracker.getStatus().getTrackerName();
+ // tracker is lost, and if it is blacklisted, remove
+ // it from the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.decrBlackListedTrackers(1);
+ }
+ updateTaskTrackerStatus(trackerName, null);
+ statistics.taskTrackerRemoved(trackerName);
+ getInstrumentation().decTrackers(1);
+ }
+
+ public synchronized void retireJob(JobID jobid, String historyFile) {
synchronized (jobs) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
@@ -450,13 +474,13 @@
}
status.setTrackingUrl(trackingUrl);
// clean up job files from the local disk
- JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+ job.cleanupLocalizedJobConf(job.getProfile().getJobID());
//this configuration is primarily for testing
//test cases can set this to false to validate job data structures on
//job completion
boolean retireJob =
- conf.getBoolean("mapred.job.tracker.retire.jobs", true);
+ conf.getBoolean(JT_RETIREJOBS, true);
if (retireJob) {
//purge the job from memory
@@ -637,7 +661,16 @@
}
}
-
+ private void incrBlackListedTrackers(int count) {
+ numBlacklistedTrackers += count;
+ getInstrumentation().addBlackListedTrackers(count);
+ }
+
+ private void decrBlackListedTrackers(int count) {
+ numBlacklistedTrackers -= count;
+ getInstrumentation().decBlackListedTrackers(count);
+ }
+
private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
FaultInfo fi = getFaultInfo(hostName, true);
boolean blackListed = fi.isBlacklisted();
@@ -796,7 +829,7 @@
getInstrumentation().addBlackListedReduceSlots(
reduceSlots);
}
- numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+ incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
}
}
@@ -816,7 +849,7 @@
}
uniqueHostsMap.put(hostName,
numTrackersOnHost);
- numBlacklistedTrackers -= numTrackersOnHost;
+ decrBlackListedTrackers(numTrackersOnHost);
}
}
@@ -934,172 +967,11 @@
// Used to recover the jobs upon restart
///////////////////////////////////////////////////////
class RecoveryManager {
- Set<JobID> jobsToRecover; // set of jobs to be recovered
-
- private int totalEventsRecovered = 0;
+ private Set<JobID> jobsToRecover; // set of jobs to be recovered
+ private int recovered;
private int restartCount = 0;
private boolean shouldRecover = false;
- Set<String> recoveredTrackers =
- Collections.synchronizedSet(new HashSet<String>());
-
- /** A custom listener that replays the events in the order in which the
- * events (task attempts) occurred.
- */
- class JobRecoveryListener implements Listener {
- // The owner job
- private JobInProgress jip;
-
- private JobHistory.JobInfo job; // current job's info object
-
- // Maintain the count of the (attempt) events recovered
- private int numEventsRecovered = 0;
-
- // Maintains open transactions
- private Map<String, String> hangingAttempts =
- new HashMap<String, String>();
-
- // Whether there are any updates for this job
- private boolean hasUpdates = false;
-
- public JobRecoveryListener(JobInProgress jip) {
- this.jip = jip;
- this.job = new JobHistory.JobInfo(jip.getJobID().toString());
- }
-
- /**
- * Process a task. Note that a task might commit a previously pending
- * transaction.
- */
- private void processTask(String taskId, JobHistory.Task task) {
- // Any TASK info commits the previous transaction
- boolean hasHanging = hangingAttempts.remove(taskId) != null;
- if (hasHanging) {
- numEventsRecovered += 2;
- }
-
- TaskID id = TaskID.forName(taskId);
- TaskInProgress tip = getTip(id);
-
- updateTip(tip, task);
- }
-
- /**
- * Adds a task-attempt in the listener
- */
- private void processTaskAttempt(String taskAttemptId,
- JobHistory.TaskAttempt attempt) {
- TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-
- // Check if the transaction for this attempt can be committed
- String taskStatus = attempt.get(Keys.TASK_STATUS);
-
- if (taskStatus.length() > 0) {
- // This means this is an update event
- if (taskStatus.equals(Values.SUCCESS.name())) {
- // Mark this attempt as hanging
- hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
- addSuccessfulAttempt(jip, id, attempt);
- } else {
- addUnsuccessfulAttempt(jip, id, attempt);
- numEventsRecovered += 2;
- }
- } else {
- createTaskAttempt(jip, id, attempt);
- }
- }
-
- public void handle(JobHistory.RecordTypes recType, Map<Keys,
- String> values) throws IOException {
- if (recType == JobHistory.RecordTypes.Job) {
- // Update the meta-level job information
- job.handle(values);
-
- // Forcefully init the job as we have some updates for it
- checkAndInit();
- } else if (recType.equals(JobHistory.RecordTypes.Task)) {
- String taskId = values.get(Keys.TASKID);
-
- // Create a task
- JobHistory.Task task = new JobHistory.Task();
- task.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(task)) {
- return;
- }
-
- // Process the task i.e update the tip state
- processTask(taskId, task);
- } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-
- // Create a task attempt
- JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
- attempt.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(attempt)) {
- return;
- }
-
- // Process the attempt i.e update the attempt state via job
- processTaskAttempt(attemptId, attempt);
- } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-
- // Create a task attempt
- JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
- attempt.handle(values);
-
- // Ignore if its a cleanup task
- if (isCleanup(attempt)) {
- return;
- }
-
- // Process the attempt i.e update the job state via job
- processTaskAttempt(attemptId, attempt);
- }
- }
-
- // Check if the task is of type CLEANUP
- private boolean isCleanup(JobHistory.Task task) {
- String taskType = task.get(Keys.TASK_TYPE);
- return Values.CLEANUP.name().equals(taskType);
- }
-
- // Init the job if its ready for init. Also make sure that the scheduler
- // is updated
- private void checkAndInit() throws IOException {
- String jobStatus = this.job.get(Keys.JOB_STATUS);
- if (Values.PREP.name().equals(jobStatus)) {
- hasUpdates = true;
- LOG.info("Calling init from RM for job " + jip.getJobID().toString());
- initJob(jip);
- if (!jip.inited()) {
- throw new IOException("Failed to initialize job " + jip.getJobID());
- }
- }
- }
-
- void close() {
- if (hasUpdates) {
- // Apply the final (job-level) updates
- JobStatusChangeEvent event = updateJob(jip, job);
-
- synchronized (JobTracker.this) {
- // Update the job listeners
- updateJobInProgressListeners(event);
- }
- }
- }
-
- public int getNumEventsRecovered() {
- return numEventsRecovered;
- }
-
- }
-
public RecoveryManager() {
jobsToRecover = new TreeSet<JobID>();
}
@@ -1108,6 +980,10 @@
return jobsToRecover.contains(id);
}
+ int getRecovered() {
+ return recovered;
+ }
+
void addJobForRecovery(JobID id) {
jobsToRecover.add(id);
}
@@ -1116,18 +992,6 @@
return shouldRecover;
}
- public boolean shouldSchedule() {
- return recoveredTrackers.isEmpty();
- }
-
- private void markTracker(String trackerName) {
- recoveredTrackers.add(trackerName);
- }
-
- void unMarkTracker(String trackerName) {
- recoveredTrackers.remove(trackerName);
- }
-
Set<JobID> getJobsToRecover() {
return jobsToRecover;
}
@@ -1163,229 +1027,8 @@
}
}
}
-
- private JobStatusChangeEvent updateJob(JobInProgress jip,
- JobHistory.JobInfo job) {
- // Change the job priority
- String jobpriority = job.get(Keys.JOB_PRIORITY);
- JobPriority priority = JobPriority.valueOf(jobpriority);
- // It's important to update this via the jobtracker's api as it will
- // take care of updating the event listeners too
- setJobPriority(jip.getJobID(), priority);
-
- // Save the previous job status
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-
- // Set the start/launch time only if there are recovered tasks
- // Increment the job's restart count
- jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
- job.getLong(JobHistory.Keys.LAUNCH_TIME));
-
- // Save the new job status
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-
- return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus,
- newStatus);
- }
-
- private void updateTip(TaskInProgress tip, JobHistory.Task task) {
- long startTime = task.getLong(Keys.START_TIME);
- if (startTime != 0) {
- tip.setExecStartTime(startTime);
- }
-
- long finishTime = task.getLong(Keys.FINISH_TIME);
- // For failed tasks finish-time will be missing
- if (finishTime != 0) {
- tip.setExecFinishTime(finishTime);
- }
-
- String cause = task.get(Keys.TASK_ATTEMPT_ID);
- if (cause.length() > 0) {
- // This means that the this is a FAILED events
- TaskAttemptID id = TaskAttemptID.forName(cause);
- TaskStatus status = tip.getTaskStatus(id);
- synchronized (JobTracker.this) {
- // This will add the tip failed event in the new log
- tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
- status.getPhase(), status.getRunState(),
- status.getTaskTracker());
- }
- }
- }
-
- private void createTaskAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt) {
- TaskID id = attemptId.getTaskID();
- String type = attempt.get(Keys.TASK_TYPE);
- TaskInProgress tip = job.getTaskInProgress(id);
-
- // I. Get the required info
- TaskStatus taskStatus = null;
- String trackerName = attempt.get(Keys.TRACKER_NAME);
- String trackerHostName =
- JobInProgress.convertTrackerNameToHostName(trackerName);
- // recover the port information.
- int port = 0; // default to 0
- String hport = attempt.get(Keys.HTTP_PORT);
- if (hport != null && hport.length() > 0) {
- port = attempt.getInt(Keys.HTTP_PORT);
- }
-
- long attemptStartTime = attempt.getLong(Keys.START_TIME);
-
- // II. Create the (appropriate) task status
- if (type.equals(Values.MAP.name())) {
- taskStatus =
- new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
- TaskStatus.State.RUNNING, "", "", trackerName,
- TaskStatus.Phase.MAP, new Counters());
- } else {
- taskStatus =
- new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE),
- TaskStatus.State.RUNNING, "", "", trackerName,
- TaskStatus.Phase.REDUCE, new Counters());
- }
-
- // Set the start time
- taskStatus.setStartTime(attemptStartTime);
-
- List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
- ttStatusList.add(taskStatus);
-
- // III. Create the dummy tasktracker status
- TaskTrackerStatus ttStatus =
- new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList,
- 0 , 0, 0);
- ttStatus.setLastSeen(clock.getTime());
-
- synchronized (JobTracker.this) {
- synchronized (taskTrackers) {
- synchronized (trackerExpiryQueue) {
- // IV. Register a new tracker
- TaskTracker taskTracker = getTaskTracker(trackerName);
- boolean isTrackerRegistered = (taskTracker != null);
- if (!isTrackerRegistered) {
- markTracker(trackerName); // add the tracker to recovery-manager
- taskTracker = new TaskTracker(trackerName);
- taskTracker.setStatus(ttStatus);
- addNewTracker(taskTracker);
- }
-
- // V. Update the tracker status
- // This will update the meta info of the jobtracker and also add the
- // tracker status if missing i.e register it
- updateTaskTrackerStatus(trackerName, ttStatus);
- }
- }
- // Register the attempt with job and tip, under JobTracker lock.
- // Since, as of today they are atomic through heartbeat.
- // VI. Register the attempt
- // a) In the job
- job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
- // b) In the tip
- tip.updateStatus(taskStatus);
- }
-
- // VII. Make an entry in the launched tasks
- expireLaunchingTasks.addNewTask(attemptId);
- }
-
- private void addSuccessfulAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt) {
- // I. Get the required info
- TaskID taskId = attemptId.getTaskID();
- String type = attempt.get(Keys.TASK_TYPE);
-
- TaskInProgress tip = job.getTaskInProgress(taskId);
- long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
- // Get the task status and the tracker name and make a copy of it
- TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
- taskStatus.setFinishTime(attemptFinishTime);
-
- String stateString = attempt.get(Keys.STATE_STRING);
-
- // Update the basic values
- taskStatus.setStateString(stateString);
- taskStatus.setProgress(1.0f);
- taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-
- // Set the shuffle/sort finished times
- if (type.equals(Values.REDUCE.name())) {
- long shuffleTime =
- Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
- long sortTime =
- Long.parseLong(attempt.get(Keys.SORT_FINISHED));
- taskStatus.setShuffleFinishTime(shuffleTime);
- taskStatus.setSortFinishTime(sortTime);
- }
- else if (type.equals(Values.MAP.name())) {
- taskStatus.setMapFinishTime(
- Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
- }
-
- // Add the counters
- String counterString = attempt.get(Keys.COUNTERS);
- Counters counter = null;
- //TODO Check if an exception should be thrown
- try {
- counter = Counters.fromEscapedCompactString(counterString);
- } catch (ParseException pe) {
- counter = new Counters(); // Set it to empty counter
- }
- taskStatus.setCounters(counter);
-
- synchronized (JobTracker.this) {
- // II. Replay the status
- job.updateTaskStatus(tip, taskStatus);
- }
-
- // III. Prevent the task from expiry
- expireLaunchingTasks.removeTask(attemptId);
- }
-
- private void addUnsuccessfulAttempt(JobInProgress job,
- TaskAttemptID attemptId,
- JobHistory.TaskAttempt attempt) {
- // I. Get the required info
- TaskID taskId = attemptId.getTaskID();
- TaskInProgress tip = job.getTaskInProgress(taskId);
- long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
- TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
- taskStatus.setFinishTime(attemptFinishTime);
-
- // Reset the progress
- taskStatus.setProgress(0.0f);
-
- String stateString = attempt.get(Keys.STATE_STRING);
- taskStatus.setStateString(stateString);
-
- boolean hasFailed =
- attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
- // Set the state failed/killed
- if (hasFailed) {
- taskStatus.setRunState(TaskStatus.State.FAILED);
- } else {
- taskStatus.setRunState(TaskStatus.State.KILLED);
- }
-
- // Get/Set the error msg
- String diagInfo = attempt.get(Keys.ERROR);
- taskStatus.setDiagnosticInfo(diagInfo); // diag info
-
- synchronized (JobTracker.this) {
- // II. Update the task status
- job.updateTaskStatus(tip, taskStatus);
- }
- // III. Prevent the task from expiry
- expireLaunchingTasks.removeTask(attemptId);
- }
-
+
Path getRestartCountFile() {
return new Path(getSystemDir(), "jobtracker.info");
}
@@ -1484,166 +1127,25 @@
return;
}
- LOG.info("Starting the recovery process with restart count : "
- + restartCount);
-
- // I. Init the jobs and cache the recovered job history filenames
- Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
- Iterator<JobID> idIter = jobsToRecover.iterator();
- while (idIter.hasNext()) {
- JobID id = idIter.next();
- LOG.info("Trying to recover details of job " + id);
- try {
- // 1. Create the job object
- JobInProgress job =
- new JobInProgress(id, JobTracker.this, conf, restartCount);
-
- // 2. Check if the user has appropriate access
- // Get the user group info for the job's owner
- UserGroupInformation ugi =
- UserGroupInformation.readFrom(job.getJobConf());
- LOG.info("Submitting job " + id + " on behalf of user "
- + ugi.getUserName() + " in groups : "
- + StringUtils.arrayToString(ugi.getGroupNames()));
-
- // check the access
- try {
- checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
- } catch (Throwable t) {
- LOG.warn("Access denied for user " + ugi.getUserName()
- + " in groups : ["
- + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
- throw t;
- }
-
- // 3. Get the log file and the file path
- String logFileName =
- JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
- if (logFileName != null) {
- Path jobHistoryFilePath =
- JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
- // 4. Recover the history file. This involved
- // - deleting file.recover if file exists
- // - renaming file.recover to file if file doesnt exist
- // This makes sure that the (master) file exists
- JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
- jobHistoryFilePath);
-
- // 5. Cache the history file name as it costs one dfs access
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
- } else {
- LOG.info("No history file found for job " + id);
- idIter.remove(); // remove from recovery list
- }
-
- // 6. Sumbit the job to the jobtracker
- addJob(id, job);
- } catch (Throwable t) {
- LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
- idIter.remove();
- continue;
- }
- }
- long now = clock.getTime();
- LOG.info("Took a total of "
- + StringUtils.formatTime(now
- - recoveryProcessStartTime)
- + " for recovering filenames of all the jobs from history.");
-
-
- // II. Recover each job
- idIter = jobsToRecover.iterator();
- while (idIter.hasNext()) {
- JobID id = idIter.next();
- JobInProgress pJob = getJob(id);
-
- // 1. Get the required info
- // Get the recovered history file
- Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
- String logFileName = jobHistoryFilePath.getName();
-
- FileSystem fs;
- try {
- fs = jobHistoryFilePath.getFileSystem(conf);
- } catch (IOException ioe) {
- LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
- ioe);
- continue;
- }
-
- // 2. Parse the history file
- // Note that this also involves job update
- JobRecoveryListener listener = new JobRecoveryListener(pJob);
+ LOG.info("Starting the recovery process for " + jobsToRecover.size() +
+ " jobs ...");
+ for (JobID jobId : jobsToRecover) {
+ LOG.info("Submitting job "+ jobId);
try {
- JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
- listener, fs);
- } catch (Throwable t) {
- LOG.info("Error reading history file of job " + pJob.getJobID()
- + ". Ignoring the error and continuing.", t);
- }
-
- // 3. Close the listener
- listener.close();
-
- // 4. Update the recovery metric
- totalEventsRecovered += listener.getNumEventsRecovered();
-
- // 5. Cleanup history
- // Delete the master log file as an indication that the new file
- // should be used in future
- try {
- synchronized (pJob) {
- JobHistory.JobInfo.checkpointRecovery(logFileName,
- pJob.getJobConf());
- }
- } catch (Throwable t) {
- LOG.warn("Failed to delete log file (" + logFileName + ") for job "
- + id + ". Continuing.", t);
- }
-
- if (pJob.isComplete()) {
- idIter.remove(); // no need to keep this job info as its successful
+ submitJob(jobId, restartCount);
+ recovered++;
+ } catch (Exception e) {
+ LOG.warn("Could not recover job " + jobId, e);
}
}
-
- long recoveryProcessEndTime = clock.getTime();
- LOG.info("Took a total of "
- + StringUtils.formatTime(recoveryProcessEndTime
- - now)
- + " for parsing and recovering all the jobs from history.");
-
- recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
- LOG.info("Took a total of " + StringUtils.formatTime(recoveryDuration)
- + " for the whole recovery process.");
+ recoveryDuration = clock.getTime() - recoveryProcessStartTime;
hasRecovered = true;
- // III. Finalize the recovery
- synchronized (trackerExpiryQueue) {
- // Make sure that the tracker statuses in the expiry-tracker queue
- // are updated
- int size = trackerExpiryQueue.size();
- for (int i = 0; i < size ; ++i) {
- // Get the first tasktracker
- TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
- // Remove it
- trackerExpiryQueue.remove(taskTracker);
-
- // Set the new time
- taskTracker.setLastSeen(recoveryProcessEndTime);
-
- // Add back to get the sorted list
- trackerExpiryQueue.add(taskTracker);
- }
- }
-
- LOG.info("Restoration done. Recovery complete!");
- }
-
- int totalEventsRecovered() {
- return totalEventsRecovered;
+ LOG.info("Recovery done! Recoverd " + recovered +" of "+
+ jobsToRecover.size() + " jobs.");
+ LOG.info("Recovery Duration (ms):" + recoveryDuration);
}
+
}
private JobTrackerInstrumentation myInstrumentation;
@@ -1653,15 +1155,14 @@
////////////////////////////////////////////////////////////////
int port;
String localMachine;
- private String trackerIdentifier;
+ private final String trackerIdentifier;
long startTime;
int totalSubmissions = 0;
private int totalMapTaskCapacity;
private int totalReduceTaskCapacity;
- private HostsFileReader hostsReader;
+ private final HostsFileReader hostsReader;
// JobTracker recovery variables
- private volatile boolean hasRestarted = false;
private volatile boolean hasRecovered = false;
private volatile long recoveryDuration;
@@ -1734,6 +1235,10 @@
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
+ private int reservedMapSlots = 0;
+ private int reservedReduceSlots = 0;
private HashMap<String, TaskTracker> taskTrackers =
new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -1745,7 +1250,7 @@
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
"expireLaunchingTasks");
- CompletedJobStatusStore completedJobStatusStore = null;
+ CompletedJobStatusStore completedJobStatusStore;
Thread completedJobsStoreThread = null;
RecoveryManager recoveryManager;
@@ -1784,7 +1289,7 @@
static final String SUBDIR = "jobTracker";
FileSystem fs = null;
Path systemDir = null;
- private JobConf conf;
+ JobConf conf;
private UserGroupInformation mrOwner;
private String supergroup;
@@ -1793,7 +1298,7 @@
long memSizeForMapSlotOnJT;
long memSizeForReduceSlotOnJT;
- private QueueManager queueManager;
+ private final QueueManager queueManager;
JobTracker(JobConf conf)
throws IOException,InterruptedException, LoginException {
@@ -1815,7 +1320,7 @@
throws IOException, InterruptedException, LoginException {
clock = newClock;
mrOwner = UnixUserGroupInformation.login(conf);
- supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+ supergroup = conf.get(JT_SUPERGROUP, "supergroup");
LOG.info("Starting jobtracker with owner as " + mrOwner.getUserName()
+ " and supergroup as " + supergroup);
this.conf = conf;
@@ -1825,19 +1330,28 @@
// Grab some static constants
//
tasktrackerExpiryInterval =
- conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
- retiredJobsCacheSize =
- conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+ conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000);
+ retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000);
MAX_BLACKLISTS_PER_TRACKER =
- conf.getInt("mapred.max.tracker.blacklists", 4);
+ conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
+
NUM_HEARTBEATS_IN_SECOND =
- conf.getInt("mapred.heartbeats.in.second", 100);
+ conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+ if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+ NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+ }
+
+ HEARTBEATS_SCALING_FACTOR =
+ conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR,
+ DEFAULT_HEARTBEATS_SCALING_FACTOR);
+ if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+ HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+ }
//This configuration is there solely for tuning purposes and
//once this feature has been tested in real clusters and an appropriate
//value for the threshold has been found, this config might be taken out.
- AVERAGE_BLACKLIST_THRESHOLD =
- conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
+ AVERAGE_BLACKLIST_THRESHOLD = conf.getFloat(JTConfig.JT_AVG_BLACKLIST_THRESHOLD, 0.5f);
// This is a directory of temporary submission files. We delete it
// on startup, and can delete any files that we're done with
@@ -1845,8 +1359,8 @@
initializeTaskMemoryRelatedConfig();
// Read the hosts/exclude files to restrict access to the jobtracker.
- this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
- conf.get("mapred.hosts.exclude", ""));
+ this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""),
+ conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
Configuration queuesConf = new Configuration(this.conf);
queueManager = new QueueManager(queuesConf);
@@ -1854,7 +1368,7 @@
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
- = conf.getClass("mapred.jobtracker.taskScheduler",
+ = conf.getClass(JT_TASK_SCHEDULER,
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
//LOOK AT THIS TODO
@@ -1893,7 +1407,7 @@
SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
}
- int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
+ int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10);
this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
if (LOG.isDebugEnabled()) {
Properties p = System.getProperties();
@@ -1905,7 +1419,7 @@
}
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
- conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+ conf.get(JT_HTTP_ADDRESS, "0.0.0.0:50030"));
String infoBindAddress = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
this.startTime = clock.getTime();
@@ -1913,8 +1427,8 @@
tmpInfoPort == 0, conf);
infoServer.setAttribute("job.tracker", this);
// initialize history parameters.
- boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
- this.startTime);
+ jobHistory = new JobHistory();
+ jobHistory.init(this, conf, this.localMachine, this.startTime);
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
@@ -1944,10 +1458,10 @@
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
this.port = interTrackerServer.getListenerAddress().getPort();
- this.conf.set("mapred.job.tracker", (this.localMachine + ":" + this.port));
+ this.conf.set(JT_IPC_ADDRESS, (this.localMachine + ":" + this.port));
LOG.info("JobTracker up at: " + this.port);
this.infoPort = this.infoServer.getPort();
- this.conf.set("mapred.job.tracker.http.address",
+ this.conf.set(JT_HTTP_ADDRESS,
infoBindAddress + ":" + this.infoPort);
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
@@ -1978,8 +1492,7 @@
// Check if the history is enabled .. as we can't have persistence with
// history disabled
- if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
- && !JobHistory.isDisableHistory()
+ if (conf.getBoolean(JT_RESTART_ENABLED, false)
&& systemDirData != null) {
for (FileStatus status : systemDirData) {
try {
@@ -1991,8 +1504,7 @@
}
// Check if there are jobs to be recovered
- hasRestarted = recoveryManager.shouldRecover();
- if (hasRestarted) {
+ if (recoveryManager.shouldRecover()) {
break; // if there is something to recover else clean the sys dir
}
}
@@ -2004,9 +1516,9 @@
}
LOG.error("Mkdirs failed to create " + systemDir);
} catch (AccessControlException ace) {
- LOG.warn("Failed to operate on mapred.system.dir (" + systemDir
+ LOG.warn("Failed to operate on " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir
+ ") because of permissions.");
- LOG.warn("Manually delete the mapred.system.dir (" + systemDir
+ LOG.warn("Manually delete the " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir
+ ") and then start the JobTracker.");
LOG.warn("Bailing out ... ");
throw ace;
@@ -2030,19 +1542,17 @@
jobConf.deleteLocalFiles(SUBDIR);
// Initialize history DONE folder
- if (historyInitialized) {
- JobHistory.initDone(conf, fs);
- String historyLogDir =
- JobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
- FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
- infoServer.setAttribute("fileSys", historyFS);
- }
+ jobHistory.initDone(conf, fs);
+ String historyLogDir =
+ jobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
+ FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
+ infoServer.setAttribute("fileSys", historyFS);
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
- this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
+ this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS,
NetworkTopology.DEFAULT_HOST_LEVEL);
//initializes the job status store
@@ -2080,13 +1590,6 @@
}
/**
- * Whether the JT has restarted
- */
- public boolean hasRestarted() {
- return hasRestarted;
- }
-
- /**
* Whether the JT has recovered upon restart
*/
public boolean hasRecovered() {
@@ -2097,13 +1600,11 @@
* How long the jobtracker took to recover from restart.
*/
public long getRecoveryDuration() {
- return hasRestarted()
- ? recoveryDuration
- : 0;
+ return recoveryDuration;
}
/**
- * Get JobTracker's FileSystem. This is the filesystem for mapred.system.dir.
+ * Get JobTracker's FileSystem. This is the filesystem for mapreduce.system.dir.
*/
FileSystem getFileSystem() {
return fs;
@@ -2111,7 +1612,7 @@
/**
* Get the FileSystem for the given path. This can be used to resolve
- * filesystem for job history, local job files or mapred.system.dir path.
+ * filesystem for job history, local job files or mapreduce.system.dir path.
*/
FileSystem getFileSystem(Path path) throws IOException {
return path.getFileSystem(conf);
@@ -2130,12 +1631,12 @@
}
public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
- return conf.getClass("mapred.jobtracker.instrumentation",
+ return conf.getClass(JT_INSTRUMENTATION,
JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
}
public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
- conf.setClass("mapred.jobtracker.instrumentation",
+ conf.setClass(JT_INSTRUMENTATION,
t, JobTrackerInstrumentation.class);
}
@@ -2145,7 +1646,7 @@
public static InetSocketAddress getAddress(Configuration conf) {
String jobTrackerStr =
- conf.get("mapred.job.tracker", "localhost:8012");
+ conf.get(JT_IPC_ADDRESS, "localhost:8012");
return NetUtils.createSocketAddr(jobTrackerStr);
}
@@ -2183,12 +1684,8 @@
enterLiveState();
taskScheduler.start();
- // Start the recovery after starting the scheduler
- try {
- recoveryManager.recover();
- } catch (Throwable t) {
- LOG.warn("Recovery manager crashed! Ignoring.", t);
- }
+ recoveryManager.recover();
+
// refresh the node list as the recovery manager might have added
// disallowed trackers
refreshHosts();
@@ -2314,6 +1811,13 @@
}
fs = null;
}
+
+ if (jobHistory != null) {
+ jobHistory.shutDown();
+ }
+
+ LOG.info("stopped all jobtracker services");
+ return;
}
@@ -2464,7 +1968,7 @@
*
* @param taskTracker tasktracker whose 'non-running' tasks are to be purged
*/
- private void removeMarkedTasks(String taskTracker) {
+ void removeMarkedTasks(String taskTracker) {
// Purge all the 'marked' tasks which were running at taskTracker
Set<TaskAttemptID> markedTaskSet =
trackerToMarkedTasksMap.get(taskTracker);
@@ -2516,17 +2020,10 @@
// start the merge of log files
JobID id = job.getStatus().getJobID();
- if (job.hasRestarted()) {
- try {
- JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
- } catch (IOException ioe) {
- LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
- }
- }
// mark the job as completed
try {
- JobHistory.JobInfo.markCompleted(id);
+ jobHistory.markCompleted(id);
} catch (IOException ioe) {
LOG.info("Failed to mark job " + id + " as completed!", ioe);
}
@@ -2607,6 +2104,13 @@
}
return v;
}
+
+ public synchronized List<JobInProgress> getFailedJobs() {
+ synchronized (jobs) {
+ return failedJobs();
+ }
+ }
+
public Vector<JobInProgress> completedJobs() {
Vector<JobInProgress> v = new Vector<JobInProgress>();
for (Iterator it = jobs.values().iterator(); it.hasNext();) {
@@ -2619,6 +2123,12 @@
return v;
}
+ public synchronized List<JobInProgress> getCompletedJobs() {
+ synchronized (jobs) {
+ return completedJobs();
+ }
+ }
+
/**
* Get all the task trackers in the cluster
*
@@ -2754,7 +2264,7 @@
*
* @param status Task Tracker's status
*/
- private void addNewTracker(TaskTracker taskTracker) {
+ void addNewTracker(TaskTracker taskTracker) {
TaskTrackerStatus status = taskTracker.getStatus();
trackerExpiryQueue.add(status);
@@ -2772,6 +2282,7 @@
hostnameToTaskTracker.put(hostname, trackers);
}
statistics.taskTrackerAdded(status.getTrackerName());
+ getInstrumentation().addTrackers(1);
LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ hostname);
trackers.add(taskTracker);
@@ -2851,7 +2362,7 @@
// Update the listeners about the job
// Assuming JobTracker is locked on entry.
- private void updateJobInProgressListeners(JobChangeEvent event) {
+ void updateJobInProgressListeners(JobChangeEvent event) {
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobUpdated(event);
}
@@ -2912,7 +2423,6 @@
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
- boolean addRestartInfo = false;
if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
@@ -2922,20 +2432,15 @@
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
- if (hasRestarted()) {
- addRestartInfo = true;
- // inform the recovery manager about this tracker joining back
- recoveryManager.unMarkTracker(trackerName);
- } else {
- // Jobtracker might have restarted but no recovery is needed
- // otherwise this code should not be reached
- LOG.warn("Serious problem, cannot find record of 'previous' " +
- "heartbeat for '" + trackerName +
- "'; reinitializing the tasktracker");
- return new HeartbeatResponse(responseId,
- new TaskTrackerAction[] {new ReinitTrackerAction()});
- }
-
+
+ // Jobtracker might have restarted but no recovery is needed
+ // otherwise this code should not be reached
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
+ "heartbeat for '" + trackerName +
+ "'; reinitializing the tasktracker");
+ return new HeartbeatResponse(responseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
+
} else {
// It is completely safe to not process a 'duplicate' heartbeat from a
@@ -2967,7 +2472,7 @@
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
- if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
+ if (acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -3010,11 +2515,6 @@
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
- // check if the restart info is req
- if (addRestartInfo) {
- response.setRecoveredJobs(recoveryManager.getJobsToRecover());
- }
-
// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);
@@ -3026,15 +2526,16 @@
/**
* Calculates next heartbeat interval using cluster size.
- * Heartbeat interval is incremented 1second for every 50 nodes.
+ * Heartbeat interval is incremented by 1 second for every 100 nodes by default.
* @return next heartbeat interval.
*/
public int getNextHeartbeatInterval() {
// get the no of task trackers
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
- (int)(1000 * Math.ceil((double)clusterSize /
- NUM_HEARTBEATS_IN_SECOND)),
+ (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+ Math.ceil((double)clusterSize /
+ NUM_HEARTBEATS_IN_SECOND)),
HEARTBEAT_INTERVAL_MIN) ;
return heartbeatInterval;
}
@@ -3072,13 +2573,19 @@
* @param status The new status for the task tracker
* @return Was an old status found?
*/
- private boolean updateTaskTrackerStatus(String trackerName,
+ boolean updateTaskTrackerStatus(String trackerName,
TaskTrackerStatus status) {
TaskTracker tt = getTaskTracker(trackerName);
TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+ getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+ getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
+ getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+ getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
@@ -3101,6 +2608,12 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
+ getInstrumentation().addRunningMaps(status.countMapTasks());
+ getInstrumentation().addRunningReduces(status.countReduceTasks());
+ getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+ getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
@@ -3168,6 +2681,25 @@
return oldStatus != null;
}
+ // Increment the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void incrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots += reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots += reservedSlots;
+ }
+ }
+
+ // Decrement the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void decrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots -= reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots -= reservedSlots;
+ }
+ }
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -3180,7 +2712,7 @@
/**
* Process incoming heartbeat messages from the task trackers.
*/
- private synchronized boolean processHeartbeat(
+ synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
@@ -3209,7 +2741,7 @@
// if this is lost tracker that came back now, and if it blacklisted
// increment the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers += 1;
+ faultyTrackers.incrBlackListedTrackers(1);
}
addNewTracker(taskTracker);
}
@@ -3226,8 +2758,7 @@
* A tracker wants to know if any of its Tasks have been
* closed (because the job completed, whether successfully or not)
*/
- private synchronized List<TaskTrackerAction> getTasksToKill(
- String taskTracker) {
+ synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) {
Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
@@ -3304,7 +2835,7 @@
/**
* A tracker wants to know if any of its Tasks can be committed
*/
- private synchronized List<TaskTrackerAction> getTasksToSave(
+ synchronized List<TaskTrackerAction> getTasksToSave(
TaskTrackerStatus tts) {
List<TaskStatus> taskStatuses = tts.getTaskReports();
if (taskStatuses != null) {
@@ -3430,10 +2961,21 @@
/**
* Allocates a new JobId string.
+ * @deprecated use {@link #getNewJobID()} instead
*/
+ @Deprecated
public synchronized JobID getNewJobId() throws IOException {
verifyServiceState(ServiceState.LIVE);
- return new JobID(getTrackerIdentifier(), nextJobId++);
+ return JobID.downgrade(getNewJobID());
+ }
+
+ /**
+ * Allocates a new JobId string.
+ */
+ public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID()
+ throws IOException {
+ return new org.apache.hadoop.mapreduce.JobID(
+ getTrackerIdentifier(), nextJobId++);
}
/**
@@ -3444,17 +2986,41 @@
* of the JobTracker. But JobInProgress adds info that's useful for
* the JobTracker alone.
*/
+ public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob(
+ org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
+ return submitJob(JobID.downgrade(jobId));
+ }
+
+ /**
+ * JobTracker.submitJob() kicks off a new job.
+ *
+ * Create a 'JobInProgress' object, which contains both JobProfile
+ * and JobStatus. Those two sub-objects are sometimes shipped outside
+ * of the JobTracker. But JobInProgress adds info that's useful for
+ * the JobTracker alone.
+ * @deprecated Use
+ * {@link #submitJob(org.apache.hadoop.mapreduce.JobID)} instead
+ */
+ @Deprecated
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
verifyServiceState(ServiceState.LIVE);
+ return submitJob(jobId, 0);
+ }
+
+ /**
+ * Submits either a new job or a job from an earlier run.
+ */
+ private synchronized JobStatus submitJob(JobID jobId,
+ int restartCount) throws IOException {
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
}
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
+ JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
String queue = job.getProfile().getQueueName();
- if(!(queueManager.getQueues().contains(queue))) {
+ if(!(queueManager.getLeafQueueNames().contains(queue))) {
new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
throw new IOException("Queue \"" + queue + "\" does not exist");
}
@@ -3464,9 +3030,11 @@
new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
throw new IOException("Queue \"" + queue + "\" is not running");
}
- // check for access
try {
- checkAccess(job, Queue.QueueOperation.SUBMIT_JOB);
+ // check for access
+ UserGroupInformation ugi =
+ UserGroupInformation.readFrom(job.getJobConf());
+ checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
@@ -3574,7 +3142,67 @@
}
}
}
-
+
+ public synchronized ClusterMetrics getClusterMetrics() {
+ return new ClusterMetrics(totalMaps,
+ totalReduces, occupiedMapSlots, occupiedReduceSlots,
+ reservedMapSlots, reservedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity,
+ totalSubmissions,
+ taskTrackers.size() - getBlacklistedTrackerCount(),
+ getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
+ }
+
+ public org.apache.hadoop.mapreduce.server.jobtracker.State
+ getJobTrackerState() {
+ return getServiceState().equals(ServiceState.LIVE) ?
+ org.apache.hadoop.mapreduce.server.jobtracker.State.RUNNING
+ : org.apache.hadoop.mapreduce.server.jobtracker.State.INITIALIZING;
+ }
+
+ public long getTaskTrackerExpiryInterval() {
+ return tasktrackerExpiryInterval;
+ }
+
+ /**
+ * Get all active trackers in cluster.
+ * @return array of TaskTrackerInfo
+ */
+ public TaskTrackerInfo[] getActiveTrackers()
+ throws IOException, InterruptedException {
+ List<String> activeTrackers = taskTrackerNames().get(0);
+ TaskTrackerInfo[] info = new TaskTrackerInfo[activeTrackers.size()];
+ for (int i = 0; i < activeTrackers.size(); i++) {
+ info[i] = new TaskTrackerInfo(activeTrackers.get(i));
+ }
+ return info;
+ }
+
+ /**
+ * Get all blacklisted trackers in cluster.
+ * @return array of TaskTrackerInfo
+ */
+ public TaskTrackerInfo[] getBlacklistedTrackers()
+ throws IOException, InterruptedException {
+ Collection<BlackListInfo> blackListed = getBlackListedTrackers();
+ TaskTrackerInfo[] info = new TaskTrackerInfo[blackListed.size()];
+ int i = 0;
+ for (BlackListInfo binfo : blackListed) {
+ info[i++] = new TaskTrackerInfo(binfo.getTrackerName(),
+ binfo.getReasonForBlackListing(), binfo.getBlackListReport());
+ }
+ return info;
+ }
+
+ public synchronized void killJob(org.apache.hadoop.mapreduce.JobID jobid)
+ throws IOException {
+ killJob(JobID.downgrade(jobid));
+ }
+
+ /**
+ * @deprecated Use {@link #killJob(org.apache.hadoop.mapreduce.JobID)} instead
+ */
+ @Deprecated
public synchronized void killJob(JobID jobid) throws IOException {
if (null == jobid) {
LOG.info("Null jobid object sent to JobTracker.killJob()");
@@ -3702,6 +3330,18 @@
* @param jobid id of the job
* @param priority new priority of the job
*/
+ public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID
+ jobid, String priority) throws IOException {
+ setJobPriority(JobID.downgrade(jobid), priority);
+ }
+ /**
+ * Set the priority of a job
+ * @param jobid id of the job
+ * @param priority new priority of the job
+ * @deprecated Use
+ * {@link #setJobPriority(org.apache.hadoop.mapreduce.JobID, String)} instead
+ */
+ @Deprecated
public synchronized void setJobPriority(JobID jobid,
String priority)
throws IOException {
@@ -3721,6 +3361,15 @@
completedJobStatusStore.store(job);
}
+ public JobProfile getJobProfile(org.apache.hadoop.mapreduce.JobID jobid) {
+ return getJobProfile(JobID.downgrade(jobid));
+ }
+
+ /**
+ * @deprecated Use {@link #getJobProfile(org.apache.hadoop.mapreduce.JobID)}
+ * instead
+ */
+ @Deprecated
public JobProfile getJobProfile(JobID jobid) {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
@@ -3730,6 +3379,16 @@
}
return completedJobStatusStore.readJobProfile(jobid);
}
+
+ public JobStatus getJobStatus(org.apache.hadoop.mapreduce.JobID jobid) {
+ return getJobStatus(JobID.downgrade(jobid));
+ }
+
+ /**
+ * @deprecated Use
+ * {@link #getJobStatus(org.apache.hadoop.mapreduce.JobID)} instead
+ */
+ @Deprecated
public JobStatus getJobStatus(JobID jobid) {
if (null == jobid) {
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
@@ -3749,6 +3408,21 @@
}
return completedJobStatusStore.readJobStatus(jobid);
}
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(
+ org.apache.hadoop.mapreduce.JobID jobid) {
+ Counters counters = getJobCounters(JobID.downgrade(jobid));
+ if (counters != null) {
+ return new org.apache.hadoop.mapreduce.Counters(counters);
+ }
+ return null;
+ }
+
+ /**
+ * @deprecated Use
+ * {@link #getJobCounters(org.apache.hadoop.mapreduce.JobID)} instead
+ */
+ @Deprecated
public Counters getJobCounters(JobID jobid) {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
@@ -3758,6 +3432,15 @@
}
return completedJobStatusStore.readCounters(jobid);
}
+
+ /**
+ * @param jobid
+ * @return array of TaskReport
+ * @deprecated Use
+ * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+ * instead
+ */
+ @Deprecated
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
if (job == null) {
@@ -3780,6 +3463,14 @@
}
}
+ /**
+ * @param jobid
+ * @return array of TaskReport
+ * @deprecated Use
+ * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+ * instead
+ */
+ @Deprecated
public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
if (job == null) {
@@ -3800,6 +3491,14 @@
}
}
+ /**
+ * @param jobid
+ * @return array of TaskReport
+ * @deprecated Use
+ * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+ * instead
+ */
+ @Deprecated
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
if (job == null) {
@@ -3822,7 +3521,15 @@
}
}
-
+
+ /**
+ * @param jobid
+ * @return array of TaskReport
+ * @deprecated Use
+ * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+ * instead
+ */
+ @Deprecated
public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
if (job == null) {
@@ -3844,24 +3551,41 @@
return reports.toArray(new TaskReport[reports.size()]);
}
}
-
+
+ public synchronized TaskReport[] getTaskReports(
+ org.apache.hadoop.mapreduce.JobID jobid, TaskType type) {
+ switch (type) {
+ case MAP :
+ return getMapTaskReports(JobID.downgrade(jobid));
+ case REDUCE :
+ return getReduceTaskReports(JobID.downgrade(jobid));
+ case JOB_CLEANUP:
+ return getCleanupTaskReports(JobID.downgrade(jobid));
+ case JOB_SETUP :
+ return getSetupTaskReports(JobID.downgrade(jobid));
+ }
+ return new TaskReport[0];
+ }
+
TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
- static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
- "mapred.cluster.map.memory.mb";
- static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
- "mapred.cluster.reduce.memory.mb";
-
- static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
- "mapred.cluster.max.map.memory.mb";
- static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
- "mapred.cluster.max.reduce.memory.mb";
+ /*
+ * Returns a list of TaskCompletionEvent for the given job,
+ * starting from fromEventId.
+ */
+ public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+ org.apache.hadoop.mapreduce.JobID jobid, int fromEventId, int maxEvents)
+ throws IOException {
+ return getTaskCompletionEvents(JobID.downgrade(jobid),
+ fromEventId, maxEvents);
+ }
/*
* Returns a list of TaskCompletionEvent for the given job,
* starting from fromEventId.
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
*/
+ @Deprecated
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
JobID jobid, int fromEventId, int maxEvents) throws IOException{
synchronized (this) {
@@ -3882,6 +3606,17 @@
* @param taskId the id of the task
* @return an array of the diagnostic messages
*/
+ public synchronized String[] getTaskDiagnostics(
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId)
+ throws IOException {
+ return getTaskDiagnostics(TaskAttemptID.downgrade(taskId));
+ }
+ /**
+ * Get the diagnostics for a given task
+ * @param taskId the id of the task
+ * @return an array of the diagnostic messages
+ */
+ @Deprecated
public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)
throws IOException {
List<String> taskDiagnosticInfo = null;
@@ -3937,8 +3672,15 @@
JobInProgress job = jobs.get(tipid.getJobID());
return (job == null ? null : job.getTaskInProgress(tipid));
}
-
+
+ public synchronized boolean killTask(
+ org.apache.hadoop.mapreduce.TaskAttemptID taskid,
+ boolean shouldFail) throws IOException {
+ return killTask(TaskAttemptID.downgrade(taskid), shouldFail);
+ }
+
/** Mark a Task to be killed */
+ @Deprecated
public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
TaskInProgress tip = taskidToTIPMap.get(taskid);
if(tip != null) {
@@ -3964,7 +3706,7 @@
return getJobStatus(jobs.values(), true);
}
- public JobStatus[] getAllJobs() {
+ public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {
List<JobStatus> list = new ArrayList<JobStatus>();
list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
list.addAll(retireJobs.getAll());
@@ -3972,17 +3714,25 @@
}
/**
- * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
*/
public String getSystemDir() {
if (fs == null) {
throw new java.lang.IllegalStateException("Filesystem is null; "
+ "JobTracker is not live: " + this);
}
- Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
+ Path sysDir = new Path(conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}
+ /**
+ * @see
+ * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir()
+ */
+ public String getJobHistoryDir() {
+ return jobHistory.getCompletedJobHistoryLocation().toString();
+ }
+
///////////////////////////////////////////////////////////////
// JobTracker methods
///////////////////////////////////////////////////////////////
@@ -4064,14 +3814,8 @@
}
TaskInProgress tip = taskidToTIPMap.get(taskId);
- // Check if the tip is known to the jobtracker. In case of a restarted
- // jt, some tasks might join in later
- if (tip != null || hasRestarted()) {
- if (tip == null) {
- tip = job.getTaskInProgress(taskId.getTaskID());
- job.addRunningTaskToTIP(tip, taskId, status, false);
- }
-
+
+ if (tip != null) {
// Update the job and inform the listeners if necessary
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
// Clone TaskStatus object here, because JobInProgress
@@ -4132,9 +3876,6 @@
trackerToTasksToCleanup.remove(trackerName);
}
- // Inform the recovery manager
- recoveryManager.unMarkTracker(trackerName);
-
Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
trackerToTaskMap.remove(trackerName);
@@ -4205,13 +3946,13 @@
}
private synchronized void refreshHosts() throws IOException {
- // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
+ // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
// Update the file names and refresh internal includes and excludes list
LOG.info("Refreshing hosts information");
Configuration conf = new Configuration();
- hostsReader.updateFileNames(conf.get("mapred.hosts",""),
- conf.get("mapred.hosts.exclude", ""));
+ hostsReader.updateFileNames(conf.get(JTConfig.JT_HOSTS_FILENAME,""),
+ conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
hostsReader.refresh();
Set<String> excludeSet = new HashSet<String>();
@@ -4227,12 +3968,13 @@
}
// main decommission
- private synchronized void decommissionNodes(Set<String> hosts)
+ synchronized void decommissionNodes(Set<String> hosts)
throws IOException {
LOG.info("Decommissioning " + hosts.size() + " nodes");
// create a list of tracker hostnames
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
+ int trackersDecommissioned = 0;
for (String host : hosts) {
LOG.info("Decommissioning host " + host);
Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -4240,12 +3982,13 @@
for (TaskTracker tracker : trackers) {
LOG.info("Decommission: Losing tracker " + tracker +
" on host " + host);
- lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+ removeTracker(tracker);
}
+ trackersDecommissioned += trackers.size();
}
LOG.info("Host " + host + " is ready for decommissioning");
}
+ getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
}
}
}
@@ -4257,14 +4000,6 @@
return hostsReader.getExcludedHosts();
}
- /**
- * Get the localized job file path on the job trackers local file system
- * @param jobId id of the job
- * @return the path of the job conf file on the local file system
- */
- public static String getLocalJobFilePath(JobID jobId){
- return JobHistory.JobInfo.getLocalJobFilePath(jobId);
- }
////////////////////////////////////////////////////////////
// main()
////////////////////////////////////////////////////////////
@@ -4285,6 +4020,9 @@
else {
if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
dumpConfiguration(new PrintWriter(System.out));
+ System.out.println();
+ QueueManager.dumpConfiguration(new PrintWriter(System.out),
+ new JobConf());
}
else {
System.out.println("usage: JobTracker [-dumpConfiguration]");
@@ -4305,30 +4043,84 @@
private static void dumpConfiguration(Writer writer) throws IOException {
Configuration.dumpConfiguration(new JobConf(), writer);
writer.write("\n");
- // get the QueueManager configuration properties
- QueueManager.dumpConfiguration(writer);
- writer.write("\n");
}
+ /**
+ * Gets the root level queues.
+ *
+ * @return array of QueueInfo object.
+ * @throws java.io.IOException
+ */
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException {
+ return getQueueInfoArray(queueManager.getRootQueues());
+ }
+
+ /**
+ * Returns immediate children of queueName.
+ *
+ * @param queueName
+ * @return array of QueueInfo which are children of queueName
+ * @throws java.io.IOException
+ */
@Override
- public JobQueueInfo[] getQueues() throws IOException {
- return queueManager.getJobQueueInfos();
+ public QueueInfo[] getChildQueues(String queueName) throws IOException {
+ return getQueueInfoArray(queueManager.getChildQueues(queueName));
}
+ /**
+ * Gets the root level queues.
+ *
+ * @return array of JobQueueInfo object.
+ * @throws java.io.IOException
+ */
+ @Deprecated
+ public JobQueueInfo[] getRootJobQueues() throws IOException {
+ return queueManager.getRootQueues();
+ }
- @Override
+ @Deprecated
+ public JobQueueInfo[] getJobQueues() throws IOException {
+ return queueManager.getJobQueueInfos();
+ }
+
+ @Deprecated
public JobQueueInfo getQueueInfo(String queue) throws IOException {
return queueManager.getJobQueueInfo(queue);
}
+ private QueueInfo[] getQueueInfoArray(JobQueueInfo[] queues)
+ throws IOException {
+ for (JobQueueInfo queue : queues) {
+ queue.setJobStatuses(getJobsFromQueue(queue.getQueueName()));
+ }
+ return queues;
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException {
+ return getQueueInfoArray(queueManager.getJobQueueInfos());
+ }
+
@Override
- public JobStatus[] getJobsFromQueue(String queue) throws IOException {
- Collection<JobInProgress> jips = taskScheduler.getJobs(queue);
+ public QueueInfo getQueue(String queue) throws IOException {
+ JobQueueInfo jqueue = queueManager.getJobQueueInfo(queue);
+ jqueue.setJobStatuses(getJobsFromQueue(jqueue.getQueueName()));
+ return jqueue;
+ }
+
+ public org.apache.hadoop.mapreduce.JobStatus[] getJobsFromQueue(String queue)
+ throws IOException {
+ Collection<JobInProgress> jips = null;
+ if (queueManager.getLeafQueueNames().contains(queue)) {
+ jips = taskScheduler.getJobs(queue);
+ }
return getJobStatus(jips,false);
}
@Override
- public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
+ public org.apache.hadoop.mapreduce.QueueAclsInfo[]
+ getQueueAclsForCurrentUser() throws IOException{
return queueManager.getQueueAcls(
UserGroupInformation.getCurrentUGI());
}
@@ -4359,7 +4151,7 @@
* Returns the confgiured maximum number of tasks for a single job
*/
int getMaxTasksPerJob() {
- return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
+ return conf.getInt(JT_TASKS_PER_JOB, -1);
}
@Override
@@ -4375,25 +4167,26 @@
public void refreshQueues() throws IOException{
LOG.info("Refreshing queue information. requested by : " +
UserGroupInformation.getCurrentUGI().getUserName());
- this.queueManager.refreshQueues(new Configuration(this.conf));
+ this.queueManager.refreshQueues(new Configuration(this.conf),
+ taskScheduler.getQueueRefresher());
}
private void initializeTaskMemoryRelatedConfig() {
memSizeForMapSlotOnJT =
JobConf.normalizeMemoryConfigValue(conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ MAPMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
memSizeForReduceSlotOnJT =
JobConf.normalizeMemoryConfigValue(conf.getLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ REDUCEMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
LOG.warn(
JobConf.deprecatedString(
JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
- " instead use "+JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
- " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
+ " instead use "+JTConfig.JT_MAX_MAPMEMORY_MB+
+ " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB
);
limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
@@ -4411,12 +4204,12 @@
limitMaxMemForMapTasks =
JobConf.normalizeMemoryConfigValue(
conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ JTConfig.JT_MAX_MAPMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
limitMaxMemForReduceTasks =
JobConf.normalizeMemoryConfigValue(
conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ JTConfig.JT_MAX_REDUCEMEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
}
@@ -4526,6 +4319,115 @@
void incrementFaults(String hostName) {
faultyTrackers.incrementFaults(hostName);
}
-
-
+
+ JobTracker(JobConf conf, Clock clock, boolean ignoredForSimulation)
+ throws IOException {
+ this.clock = clock;
+ this.conf = conf;
+ trackerIdentifier = getDateFormat().format(new Date());
+
+ if (fs == null) {
+ fs = FileSystem.get(conf);
+ }
+
+ tasktrackerExpiryInterval =
+ conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
+ retiredJobsCacheSize =
+ conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+
+ // min time before retire
+ MAX_BLACKLISTS_PER_TRACKER =
+ conf.getInt("mapred.max.tracker.blacklists", 4);
+ NUM_HEARTBEATS_IN_SECOND =
+ conf.getInt("mapred.heartbeats.in.second", 100);
+
+ try {
+ mrOwner = UnixUserGroupInformation.login(conf);
+ } catch (LoginException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+
+ this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+ conf.get("mapred.hosts.exclude", ""));
+ // queue manager
+ Configuration queuesConf = new Configuration(this.conf);
+ queueManager = new QueueManager(queuesConf);
+
+ // Create the scheduler
+ Class<? extends TaskScheduler> schedulerClass
+ = conf.getClass("mapred.jobtracker.taskScheduler",
+ JobQueueTaskScheduler.class, TaskScheduler.class);
+ taskScheduler =
+ (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
+
+ // Set ports, start RPC servers, setup security policy etc.
+ InetSocketAddress addr = getAddress(conf);
+ this.localMachine = addr.getHostName();
+ this.port = addr.getPort();
+
+ // Create the jetty server
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
+ conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+ String infoBindAddress = infoSocAddr.getHostName();
+ int tmpInfoPort = infoSocAddr.getPort();
+ this.startTime = clock.getTime();
+ infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
+ tmpInfoPort == 0, conf);
+ infoServer.setAttribute("job.tracker", this);
+
+ // initialize history parameters.
+ String historyLogDir = null;
+ FileSystem historyFS = null;
+
+ jobHistory = new JobHistory();
+ jobHistory.init(this, conf, this.localMachine, this.startTime);
+ jobHistory.initDone(conf, fs);
+ historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
+ historyFS = new Path(historyLogDir).getFileSystem(conf);
+
+ infoServer.setAttribute("fileSys", historyFS);
+ infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
+ infoServer.start();
+ this.infoPort = this.infoServer.getPort();
+
+ // Initialize instrumentation
+ JobTrackerInstrumentation tmp;
+ Class<? extends JobTrackerInstrumentation> metricsInst =
+ getInstrumentationClass(conf);
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+ tmp = c.newInstance(this, conf);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ tmp = new JobTrackerMetricsInst(this, conf);
+ }
+ myInstrumentation = tmp;
+
+ // start the recovery manager
+ recoveryManager = new RecoveryManager();
+
+ this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+ conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+ DNSToSwitchMapping.class), conf);
+ this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
+ NetworkTopology.DEFAULT_HOST_LEVEL);
+
+ //initializes the job status store
+ completedJobStatusStore = new CompletedJobStatusStore(conf);
+ }
+
+ /**
+ * Get the path of the locally stored job file
+ * @param jobId id of the job
+ * @return the path of the job file on the local file system
+ */
+ String getLocalJobFilePath(org.apache.hadoop.mapreduce.JobID jobId){
+ return System.getProperty("hadoop.log.dir") +
+ File.separator + jobId + "_conf.xml";
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -84,4 +84,80 @@
public void decBlackListedReduceSlots(int slots)
{ }
+
+ public void addReservedMapSlots(int slots)
+ { }
+
+ public void decReservedMapSlots(int slots)
+ { }
+
+ public void addReservedReduceSlots(int slots)
+ { }
+
+ public void decReservedReduceSlots(int slots)
+ { }
+
+ public void addOccupiedMapSlots(int slots)
+ { }
+
+ public void decOccupiedMapSlots(int slots)
+ { }
+
+ public void addOccupiedReduceSlots(int slots)
+ { }
+
+ public void decOccupiedReduceSlots(int slots)
+ { }
+
+ public void failedJob(JobConf conf, JobID id)
+ { }
+
+ public void killedJob(JobConf conf, JobID id)
+ { }
+
+ public void addPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void decPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void decRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningMaps(int tasks)
+ { }
+
+ public void decRunningMaps(int tasks)
+ { }
+
+ public void addRunningReduces(int tasks)
+ { }
+
+ public void decRunningReduces(int tasks)
+ { }
+
+ public void killedMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void killedReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void addTrackers(int trackers)
+ { }
+
+ public void decTrackers(int trackers)
+ { }
+
+ public void addBlackListedTrackers(int trackers)
+ { }
+
+ public void decBlackListedTrackers(int trackers)
+ { }
+
+ public void setDecommissionedTrackers(int trackers)
+ { }
+
}