You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/09 01:39:33 UTC

svn commit: r684143 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java

Author: omalley
Date: Fri Aug  8 16:39:33 2008
New Revision: 684143

URL: http://svn.apache.org/viewvc?rev=684143&view=rev
Log:
HADOOP-3864. Prevent the JobTracker from locking up when a job is being
initialized. (acmurthy via omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=684143&r1=684142&r2=684143&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug  8 16:39:33 2008
@@ -165,6 +165,9 @@
     HADOOP-3863. Use a thread-local string encoder rather than a static one
     that is protected by a lock. (acmurthy via omalley)
 
+    HADOOP-3864. Prevent the JobTracker from locking up when a job is being
+    initialized. (acmurthy via omalley)
+
   BUG FIXES
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=684143&r1=684142&r2=684143&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Aug  8 16:39:33 2008
@@ -30,6 +30,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -122,7 +123,7 @@
   long finishTime;
 
   private JobConf conf;
-  boolean tasksInited = false;
+  AtomicBoolean tasksInited = new AtomicBoolean(false);
 
   private LocalFileSystem localFs;
   private JobID jobId;
@@ -297,12 +298,22 @@
     }
     return cache;
   }
+  
+  /**
+   * Check if the job has been initialized.
+   * @return <code>true</code> if the job has been initialized, 
+   *         <code>false</code> otherwise
+   */
+  public boolean inited() {
+    return tasksInited.get();
+  }
+  
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
   public synchronized void initTasks() throws IOException {
-    if (tasksInited) {
+    if (tasksInited.get()) {
       return;
     }
 
@@ -341,7 +352,7 @@
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
-      tasksInited = true;
+      tasksInited.set(true);
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
                                     System.currentTimeMillis(), 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
@@ -375,7 +386,7 @@
     }
 
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
-    tasksInited = true;
+    tasksInited.set(true);
         
     JobHistory.JobInfo.logStarted(profile.getJobID(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
   }
@@ -663,7 +674,7 @@
                                             int clusterSize, 
                                             int numUniqueHosts
                                            ) throws IOException {
-    if (!tasksInited) {
+    if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
@@ -698,7 +709,7 @@
                                                int clusterSize,
                                                int numUniqueHosts
                                               ) throws IOException {
-    if (!tasksInited) {
+    if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=684143&r1=684142&r2=684143&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Aug  8 16:39:33 2008
@@ -1584,7 +1584,9 @@
     
   public synchronized void killJob(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    job.kill();
+    if (job.inited()) {
+      job.kill();
+    }
   }
 
   public synchronized JobProfile getJobProfile(JobID jobid) {
@@ -1653,6 +1655,8 @@
     }
   }
     
+  TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+  
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
@@ -1660,11 +1664,13 @@
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events;
+    TaskCompletionEvent[] events = EMPTY_EVENTS;
 
     JobInProgress job = this.jobs.get(jobid);
     if (null != job) {
-      events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+      if (job.inited()) {
+        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+      }
     }
     else {
       events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);