You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/08/11 12:11:28 UTC

svn commit: r803050 - in /hadoop/common/branches/branch-0.20: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/ma...

Author: ddas
Date: Tue Aug 11 10:11:27 2009
New Revision: 803050

URL: http://svn.apache.org/viewvc?rev=803050&view=rev
Log:
MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact the JobTracker lock hierarchy wasn't maintained in some JobInProgress method calls. Contributed by Amar Kamat.

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Aug 11 10:11:27 2009
@@ -205,6 +205,10 @@
     MAPREDUCE-838. Fixes a problem in the way commit of task outputs
     happens. The bug was that even if commit failed, the task would
     be declared as successful. (Amareshwari Sriramadasu via ddas)
+
+    MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
+    the JobTracker lock hierarchy wasn't maintained in some JobInProgress
+    method calls. (Amar Kamat via ddas)
  
 Release 0.20.0 - 2009-04-15
 

Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -935,7 +935,7 @@
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues);
+          jobQueuesManager,schedConf,queues, taskTrackerManager);
     }
     initializationPoller.init(queueManager.getQueues(), schedConf);
     initializationPoller.setDaemon(true);

Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Tue Aug 11 10:11:27 2009
@@ -137,19 +137,12 @@
           LOG.info("Initializing job : " + job.getJobID() + " in Queue "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getUser());
-          try {
-            if (startIniting) {
-              setInitializingJob(job);
-              job.initTasks();
-              setInitializingJob(null);
-            } else {
-              break;
-            }
-          } catch (Throwable t) {
-            LOG.info("Job initialization failed:\n"
-                + StringUtils.stringifyException(t));
-            jobQueueManager.removeJobFromWaitingQueue(job);
-            job.fail(); 
+          if (startIniting) {
+            setInitializingJob(job);
+            ttm.initJob(job);
+            setInitializingJob(null);
+          } else {
+            break;
           }
         }
       }
@@ -246,6 +239,7 @@
 
   private volatile boolean running;
 
+  private TaskTrackerManager ttm;
   /**
    * The map which provides information which thread should be used to
    * initialize jobs for a given job queue.
@@ -253,13 +247,15 @@
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
 
   public JobInitializationPoller(JobQueuesManager mgr,
-      CapacitySchedulerConf rmConf, Set<String> queue) {
+      CapacitySchedulerConf rmConf, Set<String> queue, 
+      TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
     super.setName("JobInitializationPollerThread");
     running = true;
+    this.ttm = ttm;
   }
 
   /*

Modified: hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Aug 11 10:11:27 2009
@@ -35,6 +35,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -97,8 +98,9 @@
     
     public ControlledInitializationPoller(JobQueuesManager mgr,
                                           CapacitySchedulerConf rmConf,
-                                          Set<String> queues) {
-      super(mgr, rmConf, queues);
+                                          Set<String> queues,
+                                          TaskTrackerManager ttm) {
+      super(mgr, rmConf, queues, ttm);
     }
     
     @Override
@@ -468,6 +470,27 @@
       job.kill();
     }
 
+    @Override
+    public synchronized void failJob(JobInProgress job) {
+      finalizeJob(job, JobStatus.FAILED);
+      job.fail();
+    }
+    
+    public void initJob(JobInProgress jip) {
+      try {
+        JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+        jip.initTasks();
+        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+        JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
+            EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+        for (JobInProgressListener listener : listeners) {
+          listener.jobUpdated(event);
+        }
+      } catch (Exception ioe) {
+        failJob(jip);
+      }
+    }
+    
     public void removeJob(JobID jobid) {
       jobs.remove(jobid);
     }
@@ -705,7 +728,7 @@
     controlledInitializationPoller = new ControlledInitializationPoller(
         scheduler.jobQueuesManager,
         resConf,
-        resConf.getQueues());
+        resConf.getQueues(), taskTrackerManager);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
     //by default disable speculative execution.
@@ -733,7 +756,7 @@
   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
       throws IOException {
     FakeJobInProgress j = submitJob(state, jobConf);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
   }
 
@@ -753,21 +776,10 @@
                                              String queue, String user) 
   throws IOException {
     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
   }
   
-  // Note that there is no concept of setup tasks here. So init itself should 
-  // report the job-status change
-  private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip) 
-  throws IOException {
-    JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-    jip.initTasks();
-    JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-    return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, 
-                                    oldStatus, newStatus);
-  }
-  
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
@@ -794,16 +806,10 @@
     // first (may be because of the setup tasks).
     
     // init the lower ranked job first
-    JobChangeEvent event = initTasksAndReportEvent(fjob2);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob2);
     
     // init the higher ordered job later
-    event = initTasksAndReportEvent(fjob1);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob1);
     
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 

Modified: hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Aug 11 10:11:27 2009
@@ -113,6 +113,7 @@
     try {
       Configuration conf = getConf();
       this.eagerInitListener = new EagerTaskInitializationListener(conf);
+      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
       eagerInitListener.start();
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(jobListener);

Modified: hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Aug 11 10:11:27 2009
@@ -172,6 +172,14 @@
       return null;
     }
 
+    public void initJob (JobInProgress job) {
+      // do nothing
+    }
+    
+    public void failJob (JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Tue Aug 11 10:11:27 2009
@@ -67,7 +67,7 @@
     }
   }
   
-  static class InitJob implements Runnable {
+  class InitJob implements Runnable {
   
     private JobInProgress job;
     
@@ -76,16 +76,7 @@
     }
     
     public void run() {
-      try {
-        LOG.info("Initializing " + job.getJobID());
-        job.initTasks();
-      } catch (Throwable t) {
-        LOG.error("Job initialization failed:\n" +
-            StringUtils.stringifyException(t));
-        if (job != null) {
-          job.fail();
-        }
-      }
+      ttm.initJob(job);
     }
   }
   
@@ -94,12 +85,17 @@
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   private ExecutorService threadPool;
   private int numThreads;
+  private TaskTrackerManager ttm;
   
   public EagerTaskInitializationListener(Configuration conf) {
     numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
     threadPool = Executors.newFixedThreadPool(numThreads);
   }
   
+  public void setTaskTrackerManager(TaskTrackerManager ttm) {
+    this.ttm = ttm;
+  }
+  
   public void start() throws IOException {
     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
     jobInitManagerThread.setDaemon(true);

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 11 10:11:27 2009
@@ -54,6 +54,16 @@
  * ***********************************************************
  */
 class JobInProgress {
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+   private static final long serialVersionUID = 1L;
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
     
   JobProfile profile;
@@ -377,12 +387,13 @@
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
-  public synchronized void initTasks() throws IOException {
-    if (tasksInited.get()) {
+  public synchronized void initTasks() 
+  throws IOException, KillInterruptedException {
+    if (tasksInited.get() || isComplete()) {
       return;
     }
     synchronized(jobInitKillStatus){
-      if(jobInitKillStatus.killed) {
+      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
         return;
       }
       jobInitKillStatus.initStarted = true;
@@ -493,9 +504,7 @@
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
       if(jobInitKillStatus.killed) {
-        //setup not launched so directly terminate
-        terminateJob(JobStatus.KILLED);
-        return;
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
     }
     
@@ -2199,15 +2208,12 @@
   }
 
   /**
-   * Kill the job and all its component tasks. This method is called from 
+   * Kill the job and all its component tasks. This method should be called from 
    * jobtracker and should return fast as it locks the jobtracker.
    */
   public void kill() {
     boolean killNow = false;
     synchronized(jobInitKillStatus) {
-      if(jobInitKillStatus.killed) {//job is already marked for killing
-        return;
-      }
       jobInitKillStatus.killed = true;
       //if not in middle of init, terminate it now
       if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
@@ -2221,7 +2227,9 @@
   }
   
   /**
-   * Fails the job and all its component tasks.
+   * Fails the job and all its component tasks. This should be called only from
+   * {@link JobInProgress} or {@link JobTracker}. Look at 
+   * {@link JobTracker#failJob(JobInProgress)} for more details.
    */
   synchronized void fail() {
     terminate(JobStatus.FAILED);

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -47,7 +47,7 @@
   public synchronized void start() throws IOException {
     super.start();
     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
-    
+    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
     eagerTaskInitializationListener.start();
     taskTrackerManager.addJobInProgressListener(
         eagerTaskInitializationListener);

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Aug 11 10:11:27 2009
@@ -60,6 +60,7 @@
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -820,11 +821,11 @@
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           try {
-            jip.initTasks();
+            initJob(jip);
           } catch (Throwable t) {
             LOG.error("Job initialization failed : \n" 
                       + StringUtils.stringifyException(t));
-            jip.fail(); // fail the job
+            failJob(jip);
             throw new IOException(t);
           }
         }
@@ -3085,8 +3086,13 @@
       return;
     }
         
-    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
+    killJob(job);
+  }
+  
+  private synchronized void killJob(JobInProgress job) {
+    LOG.info("Killing job " + job.getJobID());
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     job.kill();
     
     // Inform the listeners if the job is killed
@@ -3105,6 +3111,64 @@
     }
   }
 
+  public void initJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Init on null job is not valid");
+      return;
+    }
+	        
+    try {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      LOG.info("Initializing " + job.getJobID());
+      job.initTasks();
+      // Inform the listeners if the job state has changed
+      // Note : that the job will be in PREP state.
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        synchronized (JobTracker.this) {
+          updateJobInProgressListeners(event);
+        }
+      }
+    } catch (KillInterruptedException kie) {
+      //   If job was killed during initialization, job state will be KILLED
+      LOG.error("Job initialization interrupted:\n" +
+          StringUtils.stringifyException(kie));
+      killJob(job);
+    } catch (Throwable t) {
+      // If the job initialization is failed, job state will be FAILED
+      LOG.error("Job initialization failed:\n" +
+          StringUtils.stringifyException(t));
+      failJob(job);
+    }
+	 }
+
+  /**
+   * Fail a job and inform the listeners. Other components in the framework 
+   * should use this to fail a job.
+   */
+  public synchronized void failJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Fail on null job is not valid");
+      return;
+    }
+         
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+    LOG.info("Failing job " + job.getJobID());
+    job.fail();
+     
+    // Inform the listeners if the job state has changed
+    JobStatus newStatus = (JobStatus)job.getStatus().clone();
+    if (prevStatus.getRunState() != newStatus.getRunState()) {
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+            newStatus);
+      updateJobInProgressListeners(event);
+    }
+  }
+  
   /**
    * Set the priority of a job
    * @param jobid id of the job

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Tue Aug 11 10:11:27 2009
@@ -88,4 +88,18 @@
    * @return jobInProgress object
    */
   public JobInProgress getJob(JobID jobid);
+  
+  /**
+   * Initialize the Job
+   * 
+   * @param job JobInProgress object
+   */
+  public void initJob(JobInProgress job);
+  
+  /**
+   * Fail a job.
+   * 
+   * @param job JobInProgress object
+   */
+  public void failJob(JobInProgress job);
 }

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Aug 11 10:11:27 2009
@@ -500,7 +500,7 @@
    */
   public void initializeJob(JobID jobId) throws IOException {
     JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
-    job.initTasks();
+    jobTracker.getJobTracker().initJob(job);
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Aug 11 10:11:27 2009
@@ -184,6 +184,14 @@
       return null;
     }
 
+    public void initJob(JobInProgress job) {
+      // do nothing
+    }
+    
+    public void failJob(JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Tue Aug 11 10:11:27 2009
@@ -479,7 +479,7 @@
     JobID id = job2.getID();*/
     JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     // find out the history filename
     String history = 
@@ -494,7 +494,7 @@
     id = job1.getID();
     jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     //  make sure that cleanup is launched and is waiting
     while (!jip.isCleanupLaunched()) {

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=803050&r1=803049&r2=803050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Aug 11 10:11:27 2009
@@ -27,6 +27,8 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 public class TestParallelInitialization extends TestCase {
   
@@ -135,8 +137,39 @@
       return null;
     }
 
+    public void initJob(JobInProgress job) {
+      try {
+        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+        job.initTasks();
+        JobStatus newStatus = (JobStatus)job.getStatus().clone();
+        if (prevStatus.getRunState() != newStatus.getRunState()) {
+          JobStatusChangeEvent event = 
+            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+                newStatus);
+          for (JobInProgressListener listener : listeners) {
+            listener.jobUpdated(event);
+          }
+        }
+      } catch (Exception ioe) {
+        failJob(job);
+      }
+    }
     // Test methods
     
+    public synchronized void failJob(JobInProgress job) {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      job.fail();
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        for (JobInProgressListener listener : listeners) {
+          listener.jobUpdated(event);
+        }
+      }
+    }
+    
     public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);