You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/08/11 12:11:28 UTC
svn commit: r803050 - in /hadoop/common/branches/branch-0.20: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/java/org/apache/hadoop/ma...
Author: ddas
Date: Tue Aug 11 10:11:27 2009
New Revision: 803050
URL: http://svn.apache.org/viewvc?rev=803050&view=rev
Log:
MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact the JobTracker lock hierarchy wasn't maintained in some JobInProgress method calls. Contributed by Amar Kamat.
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Aug 11 10:11:27 2009
@@ -205,6 +205,10 @@
MAPREDUCE-838. Fixes a problem in the way commit of task outputs
happens. The bug was that even if commit failed, the task would
be declared as successful. (Amareshwari Sriramadasu via ddas)
+
+ MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
+ the JobTracker lock hierarchy wasn't maintained in some JobInProgress
+ method calls. (Amar Kamat via ddas)
Release 0.20.0 - 2009-04-15
Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -935,7 +935,7 @@
//Start thread for initialization
if (initializationPoller == null) {
this.initializationPoller = new JobInitializationPoller(
- jobQueuesManager,schedConf,queues);
+ jobQueuesManager,schedConf,queues, taskTrackerManager);
}
initializationPoller.init(queueManager.getQueues(), schedConf);
initializationPoller.setDaemon(true);
Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Tue Aug 11 10:11:27 2009
@@ -137,19 +137,12 @@
LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+ job.getProfile().getQueueName() + " For user : "
+ job.getProfile().getUser());
- try {
- if (startIniting) {
- setInitializingJob(job);
- job.initTasks();
- setInitializingJob(null);
- } else {
- break;
- }
- } catch (Throwable t) {
- LOG.info("Job initialization failed:\n"
- + StringUtils.stringifyException(t));
- jobQueueManager.removeJobFromWaitingQueue(job);
- job.fail();
+ if (startIniting) {
+ setInitializingJob(job);
+ ttm.initJob(job);
+ setInitializingJob(null);
+ } else {
+ break;
}
}
}
@@ -246,6 +239,7 @@
private volatile boolean running;
+ private TaskTrackerManager ttm;
/**
* The map which provides information which thread should be used to
* initialize jobs for a given job queue.
@@ -253,13 +247,15 @@
private HashMap<String, JobInitializationThread> threadsToQueueMap;
public JobInitializationPoller(JobQueuesManager mgr,
- CapacitySchedulerConf rmConf, Set<String> queue) {
+ CapacitySchedulerConf rmConf, Set<String> queue,
+ TaskTrackerManager ttm) {
initializedJobs = new HashMap<JobID,JobInProgress>();
jobQueues = new HashMap<String, QueueInfo>();
this.jobQueueManager = mgr;
threadsToQueueMap = new HashMap<String, JobInitializationThread>();
super.setName("JobInitializationPollerThread");
running = true;
+ this.ttm = ttm;
}
/*
Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Aug 11 10:11:27 2009
@@ -35,6 +35,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -97,8 +98,9 @@
public ControlledInitializationPoller(JobQueuesManager mgr,
CapacitySchedulerConf rmConf,
- Set<String> queues) {
- super(mgr, rmConf, queues);
+ Set<String> queues,
+ TaskTrackerManager ttm) {
+ super(mgr, rmConf, queues, ttm);
}
@Override
@@ -468,6 +470,27 @@
job.kill();
}
+ @Override
+ public synchronized void failJob(JobInProgress job) {
+ finalizeJob(job, JobStatus.FAILED);
+ job.fail();
+ }
+
+ public void initJob(JobInProgress jip) {
+ try {
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+ jip.initTasks();
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+ JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
+ EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ } catch (Exception ioe) {
+ failJob(jip);
+ }
+ }
+
public void removeJob(JobID jobid) {
jobs.remove(jobid);
}
@@ -705,7 +728,7 @@
controlledInitializationPoller = new ControlledInitializationPoller(
scheduler.jobQueuesManager,
resConf,
- resConf.getQueues());
+ resConf.getQueues(), taskTrackerManager);
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
//by default disable speculative execution.
@@ -733,7 +756,7 @@
private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
throws IOException {
FakeJobInProgress j = submitJob(state, jobConf);
- scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+ taskTrackerManager.initJob(j);
return j;
}
@@ -753,21 +776,10 @@
String queue, String user)
throws IOException {
FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
- scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+ taskTrackerManager.initJob(j);
return j;
}
- // Note that there is no concept of setup tasks here. So init itself should
- // report the job-status change
- private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip)
- throws IOException {
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
- jip.initTasks();
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
- oldStatus, newStatus);
- }
-
// test job run-state change
public void testJobRunStateChange() throws IOException {
// start the scheduler
@@ -794,16 +806,10 @@
// first (may be because of the setup tasks).
// init the lower ranked job first
- JobChangeEvent event = initTasksAndReportEvent(fjob2);
-
- // inform the scheduler
- scheduler.jobQueuesManager.jobUpdated(event);
+ taskTrackerManager.initJob(fjob2);
// init the higher ordered job later
- event = initTasksAndReportEvent(fjob1);
-
- // inform the scheduler
- scheduler.jobQueuesManager.jobUpdated(event);
+ taskTrackerManager.initJob(fjob1);
// check if the jobs are missing from the waiting queue
// The jobs are not removed from waiting queue until they are scheduled
Modified: hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Aug 11 10:11:27 2009
@@ -113,6 +113,7 @@
try {
Configuration conf = getConf();
this.eagerInitListener = new EagerTaskInitializationListener(conf);
+ eagerInitListener.setTaskTrackerManager(taskTrackerManager);
eagerInitListener.start();
taskTrackerManager.addJobInProgressListener(eagerInitListener);
taskTrackerManager.addJobInProgressListener(jobListener);
Modified: hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Aug 11 10:11:27 2009
@@ -172,6 +172,14 @@
return null;
}
+ public void initJob (JobInProgress job) {
+ // do nothing
+ }
+
+ public void failJob (JobInProgress job) {
+ // do nothing
+ }
+
// Test methods
public void submitJob(JobInProgress job) throws IOException {
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Tue Aug 11 10:11:27 2009
@@ -67,7 +67,7 @@
}
}
- static class InitJob implements Runnable {
+ class InitJob implements Runnable {
private JobInProgress job;
@@ -76,16 +76,7 @@
}
public void run() {
- try {
- LOG.info("Initializing " + job.getJobID());
- job.initTasks();
- } catch (Throwable t) {
- LOG.error("Job initialization failed:\n" +
- StringUtils.stringifyException(t));
- if (job != null) {
- job.fail();
- }
- }
+ ttm.initJob(job);
}
}
@@ -94,12 +85,17 @@
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
private ExecutorService threadPool;
private int numThreads;
+ private TaskTrackerManager ttm;
public EagerTaskInitializationListener(Configuration conf) {
numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
threadPool = Executors.newFixedThreadPool(numThreads);
}
+ public void setTaskTrackerManager(TaskTrackerManager ttm) {
+ this.ttm = ttm;
+ }
+
public void start() throws IOException {
this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
jobInitManagerThread.setDaemon(true);
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 11 10:11:27 2009
@@ -54,6 +54,16 @@
* ***********************************************************
*/
class JobInProgress {
+ /**
+ * Used when the a kill is issued to a job which is initializing.
+ */
+ static class KillInterruptedException extends InterruptedException {
+ private static final long serialVersionUID = 1L;
+ public KillInterruptedException(String msg) {
+ super(msg);
+ }
+ }
+
static final Log LOG = LogFactory.getLog(JobInProgress.class);
JobProfile profile;
@@ -377,12 +387,13 @@
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
- public synchronized void initTasks() throws IOException {
- if (tasksInited.get()) {
+ public synchronized void initTasks()
+ throws IOException, KillInterruptedException {
+ if (tasksInited.get() || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
- if(jobInitKillStatus.killed) {
+ if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
@@ -493,9 +504,7 @@
synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
if(jobInitKillStatus.killed) {
- //setup not launched so directly terminate
- terminateJob(JobStatus.KILLED);
- return;
+ throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}
@@ -2199,15 +2208,12 @@
}
/**
- * Kill the job and all its component tasks. This method is called from
+ * Kill the job and all its component tasks. This method should be called from
* jobtracker and should return fast as it locks the jobtracker.
*/
public void kill() {
boolean killNow = false;
synchronized(jobInitKillStatus) {
- if(jobInitKillStatus.killed) {//job is already marked for killing
- return;
- }
jobInitKillStatus.killed = true;
//if not in middle of init, terminate it now
if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
@@ -2221,7 +2227,9 @@
}
/**
- * Fails the job and all its component tasks.
+ * Fails the job and all its component tasks. This should be called only from
+ * {@link JobInProgress} or {@link JobTracker}. Look at
+ * {@link JobTracker#failJob(JobInProgress)} for more details.
*/
synchronized void fail() {
terminate(JobStatus.FAILED);
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -47,7 +47,7 @@
public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
-
+ eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Aug 11 10:11:27 2009
@@ -60,6 +60,7 @@
import org.apache.hadoop.mapred.JobHistory.Keys;
import org.apache.hadoop.mapred.JobHistory.Listener;
import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
@@ -820,11 +821,11 @@
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
try {
- jip.initTasks();
+ initJob(jip);
} catch (Throwable t) {
LOG.error("Job initialization failed : \n"
+ StringUtils.stringifyException(t));
- jip.fail(); // fail the job
+ failJob(jip);
throw new IOException(t);
}
}
@@ -3085,8 +3086,13 @@
return;
}
- JobStatus prevStatus = (JobStatus)job.getStatus().clone();
checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+ killJob(job);
+ }
+
+ private synchronized void killJob(JobInProgress job) {
+ LOG.info("Killing job " + job.getJobID());
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
job.kill();
// Inform the listeners if the job is killed
@@ -3105,6 +3111,64 @@
}
}
+ public void initJob(JobInProgress job) {
+ if (null == job) {
+ LOG.info("Init on null job is not valid");
+ return;
+ }
+
+ try {
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ LOG.info("Initializing " + job.getJobID());
+ job.initTasks();
+ // Inform the listeners if the job state has changed
+ // Note : that the job will be in PREP state.
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ synchronized (JobTracker.this) {
+ updateJobInProgressListeners(event);
+ }
+ }
+ } catch (KillInterruptedException kie) {
+ // If job was killed during initialization, job state will be KILLED
+ LOG.error("Job initialization interrupted:\n" +
+ StringUtils.stringifyException(kie));
+ killJob(job);
+ } catch (Throwable t) {
+ // If the job initialization is failed, job state will be FAILED
+ LOG.error("Job initialization failed:\n" +
+ StringUtils.stringifyException(t));
+ failJob(job);
+ }
+ }
+
+ /**
+ * Fail a job and inform the listeners. Other components in the framework
+ * should use this to fail a job.
+ */
+ public synchronized void failJob(JobInProgress job) {
+ if (null == job) {
+ LOG.info("Fail on null job is not valid");
+ return;
+ }
+
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ LOG.info("Failing job " + job.getJobID());
+ job.fail();
+
+ // Inform the listeners if the job state has changed
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ updateJobInProgressListeners(event);
+ }
+ }
+
/**
* Set the priority of a job
* @param jobid id of the job
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Tue Aug 11 10:11:27 2009
@@ -88,4 +88,18 @@
* @return jobInProgress object
*/
public JobInProgress getJob(JobID jobid);
+
+ /**
+ * Initialize the Job
+ *
+ * @param job JobInProgress object
+ */
+ public void initJob(JobInProgress job);
+
+ /**
+ * Fail a job.
+ *
+ * @param job JobInProgress object
+ */
+ public void failJob(JobInProgress job);
}
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Aug 11 10:11:27 2009
@@ -500,7 +500,7 @@
*/
public void initializeJob(JobID jobId) throws IOException {
JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
- job.initTasks();
+ jobTracker.getJobTracker().initJob(job);
}
/**
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -184,6 +184,14 @@
return null;
}
+ public void initJob(JobInProgress job) {
+ // do nothing
+ }
+
+ public void failJob(JobInProgress job) {
+ // do nothing
+ }
+
// Test methods
public void submitJob(JobInProgress job) throws IOException {
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Tue Aug 11 10:11:27 2009
@@ -479,7 +479,7 @@
JobID id = job2.getID();*/
JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
- jip.initTasks();
+ mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// find out the history filename
String history =
@@ -494,7 +494,7 @@
id = job1.getID();
jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
- jip.initTasks();
+ mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// make sure that cleanup is launched and is waiting
while (!jip.isCleanupLaunched()) {
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Aug 11 10:11:27 2009
@@ -27,6 +27,8 @@
import junit.framework.TestCase;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
public class TestParallelInitialization extends TestCase {
@@ -135,8 +137,39 @@
return null;
}
+ public void initJob(JobInProgress job) {
+ try {
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ job.initTasks();
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+ } catch (Exception ioe) {
+ failJob(job);
+ }
+ }
// Test methods
+ public synchronized void failJob(JobInProgress job) {
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ job.fail();
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+ }
+
public void submitJob(JobInProgress job) throws IOException {
for (JobInProgressListener listener : listeners) {
listener.jobAdded(job);