You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/15 20:39:46 UTC
svn commit: r704990 - in /hadoop/core/branches/branch-0.19: ./ CHANGES.txt
src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
src/mapred/org/apache/hadoop/mapred/JobInProgress.java
src/mapred/org/apache/hadoop/mapred/JobTracker.java
Author: acmurthy
Date: Wed Oct 15 11:39:46 2008
New Revision: 704990
URL: http://svn.apache.org/viewvc?rev=704990&view=rev
Log:
Merge -r 704988:704989 from trunk to branch-0.19 to fix HADOOP-4236.
Modified:
hadoop/core/branches/branch-0.19/ (props changed)
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 15 11:39:46 2008
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 11:39:46 2008
@@ -867,6 +867,9 @@
determine whether to canonicalize file paths or not.
(Amareshwari Sriramadasu via ddas)
+ HADOOP-4236. Ensure un-initialized jobs are killed correctly on
+ user-demand. (Sharad Agarwal via acmurthy)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Wed Oct 15 11:39:46 2008
@@ -58,7 +58,7 @@
LOG.error("Job initialization failed:\n" +
StringUtils.stringifyException(t));
if (job != null) {
- job.terminateJob(JobStatus.FAILED);
+ job.fail();
}
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 15 11:39:46 2008
@@ -142,6 +142,7 @@
private JobConf conf;
AtomicBoolean tasksInited = new AtomicBoolean(false);
+ private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
private LocalFileSystem localFs;
private JobID jobId;
@@ -340,6 +341,12 @@
if (tasksInited.get()) {
return;
}
+ synchronized(jobInitKillStatus){
+ if(jobInitKillStatus.killed) {
+ return;
+ }
+ jobInitKillStatus.initStarted = true;
+ }
// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
@@ -450,6 +457,15 @@
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setSetupTask();
+ synchronized(jobInitKillStatus){
+ jobInitKillStatus.initDone = true;
+ if(jobInitKillStatus.killed) {
+ //setup not launched so directly terminate
+ terminateJob(JobStatus.KILLED);
+ return;
+ }
+ }
+
tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
@@ -881,40 +897,47 @@
* Return a CleanupTask, if appropriate, to run on the given tasktracker
*
*/
- public synchronized Task obtainCleanupTask(TaskTrackerStatus tts,
+ public Task obtainCleanupTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts,
boolean isMapSlot
) throws IOException {
- if (!canLaunchCleanupTask()) {
- return null;
- }
-
- String taskTracker = tts.getTrackerName();
- // Update the last-known clusterSize
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTracker)) {
+ if(!tasksInited.get()) {
return null;
}
- List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
- if (isMapSlot) {
- cleanupTaskList.add(cleanup[0]);
- } else {
- cleanupTaskList.add(cleanup[1]);
- }
- TaskInProgress tip = findTaskFromList(cleanupTaskList,
- tts, numUniqueHosts, false);
- if (tip == null) {
- return null;
+ synchronized(this) {
+ if (!canLaunchCleanupTask()) {
+ return null;
+ }
+
+ String taskTracker = tts.getTrackerName();
+ // Update the last-known clusterSize
+ this.clusterSize = clusterSize;
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
+ }
+
+ List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+ if (isMapSlot) {
+ cleanupTaskList.add(cleanup[0]);
+ } else {
+ cleanupTaskList.add(cleanup[1]);
+ }
+ TaskInProgress tip = findTaskFromList(cleanupTaskList,
+ tts, numUniqueHosts, false);
+ if (tip == null) {
+ return null;
+ }
+
+ // Now launch the cleanupTask
+ Task result = tip.getTaskToRun(tts.getTrackerName());
+ if (result != null) {
+ addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+ }
+ return result;
}
- // Now launch the cleanupTask
- Task result = tip.getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
- }
- return result;
}
/**
@@ -956,40 +979,46 @@
* Return a SetupTask, if appropriate, to run on the given tasktracker
*
*/
- public synchronized Task obtainSetupTask(TaskTrackerStatus tts,
+ public Task obtainSetupTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts,
boolean isMapSlot
) throws IOException {
- if (!canLaunchSetupTask()) {
+ if(!tasksInited.get()) {
return null;
}
- String taskTracker = tts.getTrackerName();
- // Update the last-known clusterSize
- this.clusterSize = clusterSize;
- if (!shouldRunOnTaskTracker(taskTracker)) {
- return null;
- }
-
- List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
- if (isMapSlot) {
- setupTaskList.add(setup[0]);
- } else {
- setupTaskList.add(setup[1]);
- }
- TaskInProgress tip = findTaskFromList(setupTaskList,
- tts, numUniqueHosts, false);
- if (tip == null) {
- return null;
- }
-
- // Now launch the setupTask
- Task result = tip.getTaskToRun(tts.getTrackerName());
- if (result != null) {
- addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+ synchronized(this) {
+ if (!canLaunchSetupTask()) {
+ return null;
+ }
+
+ String taskTracker = tts.getTrackerName();
+ // Update the last-known clusterSize
+ this.clusterSize = clusterSize;
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
+ }
+
+ List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+ if (isMapSlot) {
+ setupTaskList.add(setup[0]);
+ } else {
+ setupTaskList.add(setup[1]);
+ }
+ TaskInProgress tip = findTaskFromList(setupTaskList,
+ tts, numUniqueHosts, false);
+ if (tip == null) {
+ return null;
+ }
+
+ // Now launch the setupTask
+ Task result = tip.getTaskToRun(tts.getTrackerName());
+ if (result != null) {
+ addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+ }
+ return result;
}
- return result;
}
/**
@@ -1883,7 +1912,7 @@
}
}
- synchronized void terminateJob(int jobTerminationState) {
+ private synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
if (jobTerminationState == JobStatus.FAILED) {
@@ -1909,12 +1938,33 @@
/**
* Terminate the job and all its component tasks.
+ * Calling this will lead to marking the job as failed/killed. Cleanup
+ * tip will be launched. If the job has not inited, it will directly call
+ * terminateJob as there is no need to launch cleanup tip.
+ * This method is reentrant.
* @param jobTerminationState job termination state
*/
private synchronized void terminate(int jobTerminationState) {
+ if(!tasksInited.get()) {
+ //init could not be done, we just terminate directly.
+ terminateJob(jobTerminationState);
+ return;
+ }
+
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
LOG.info("Killing job '" + this.status.getJobID() + "'");
+ if (jobTerminationState == JobStatus.FAILED) {
+ if(jobFailed) {//reentrant
+ return;
+ }
+ jobFailed = true;
+ } else if (jobTerminationState == JobStatus.KILLED) {
+ if(jobKilled) {//reentrant
+ return;
+ }
+ jobKilled = true;
+ }
//
// kill all TIPs.
//
@@ -1927,19 +1977,29 @@
for (int i = 0; i < reduces.length; i++) {
reduces[i].kill();
}
- if (jobTerminationState == JobStatus.FAILED) {
- jobFailed = true;
- } else if (jobTerminationState == JobStatus.KILLED) {
- jobKilled = true;
- }
}
}
/**
- * Kill the job and all its component tasks.
+ * Kill the job and all its component tasks. This method is called from
+ * jobtracker and should return fast as it locks the jobtracker.
*/
- public synchronized void kill() {
- terminate(JobStatus.KILLED);
+ public void kill() {
+ boolean killNow = false;
+ synchronized(jobInitKillStatus) {
+ if(jobInitKillStatus.killed) {//job is already marked for killing
+ return;
+ }
+ jobInitKillStatus.killed = true;
+ //if not in middle of init, terminate it now
+ if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
+ //avoiding nested locking by setting flag
+ killNow = true;
+ }
+ }
+ if(killNow) {
+ terminate(JobStatus.KILLED);
+ }
}
/**
@@ -2312,4 +2372,17 @@
this.schedulingInfo = schedulingInfo;
this.status.setSchedulingInfo(schedulingInfo.toString());
}
+
+ /**
+ * To keep track of kill and initTasks status of this job. initTasks() take
+ * a lock on JobInProgress object. kill should avoid waiting on
+ * JobInProgress lock since it may take a while to do initTasks().
+ */
+ private static class JobInitKillStatus {
+ //flag to be set if kill is called
+ boolean killed;
+
+ boolean initStarted;
+ boolean initDone;
+ }
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct 15 11:39:46 2008
@@ -2264,9 +2264,7 @@
public synchronized void killJob(JobID jobid) throws IOException {
JobInProgress job = jobs.get(jobid);
checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
- if (job.inited()) {
- job.kill();
- }
+ job.kill();
}
/**