You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/23 08:13:33 UTC

svn commit: r746902 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: ddas
Date: Mon Feb 23 07:13:32 2009
New Revision: 746902

URL: http://svn.apache.org/viewvc?rev=746902&view=rev
Log:
HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class outside the TaskTracker and makes it a generic class that is used by the JobTracker also for deleting the paths on the job's output fs. (3) Moves the references to completedJobStore outside the block where the JobTracker is locked. Contributed by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746902&r1=746901&r2=746902&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Feb 23 07:13:32 2009
@@ -834,6 +834,13 @@
     HADOOP-5282. Fixed job history logs for task attempts that are failed by the
     JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
 
+    HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+    inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+    outside the TaskTracker and makes it a generic class that is used by the 
+    JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+    references to completedJobStore outside the block where the JobTracker is locked.
+    (ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=746902&r1=746901&r2=746902&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Feb 23 07:13:32 2009
@@ -978,35 +978,39 @@
   /*
    * Return task cleanup attempt if any, to run on a given tracker
    */
-  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
   throws IOException {
-    if (this.status.getRunState() != JobStatus.RUNNING || 
-        jobFailed || jobKilled) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!tasksInited.get()) {
       return null;
     }
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    if (isMapSlot) {
-      if (!mapCleanupTasks.isEmpty()) {
-        taskid = mapCleanupTasks.remove(0);
-        tip = maps[taskid.getTaskID().getId()];
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
       }
-    } else {
-      if (!reduceCleanupTasks.isEmpty()) {
-        taskid = reduceCleanupTasks.remove(0);
-        tip = reduces[taskid.getTaskID().getId()];
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
       }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
     }
-    if (tip != null) {
-      return tip.addRunningTask(taskid, taskTracker, true);
-    }
-    return null;
   }
   
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
@@ -1111,9 +1115,6 @@
    * @return true/false
    */
   private synchronized boolean canLaunchJobCleanupTask() {
-    if (!tasksInited.get()) {
-      return false;
-    }
     // check if the job is running
     if (status.getRunState() != JobStatus.RUNNING &&
         status.getRunState() != JobStatus.PREP) {
@@ -2444,6 +2445,7 @@
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
     try {
@@ -2467,8 +2469,7 @@
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
-      FileSystem fs = tempDir.getFileSystem(conf);
-      fs.delete(tempDir, true); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -2479,6 +2480,7 @@
     this.runningMapCache = null;
     this.nonRunningReduces = null;
     this.runningReduces = null;
+
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=746902&r1=746901&r2=746902&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Feb 23 07:13:32 2009
@@ -1823,9 +1823,6 @@
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
-
-    //persists the job info in DFS
-    completedJobStatusStore.store(job);
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
@@ -2856,34 +2853,41 @@
     setJobPriority(jobid, newPriority);
   }
                            
-  public synchronized JobProfile getJobProfile(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getProfile();
-    } else {
-      return completedJobStatusStore.readJobProfile(jobid);
+  void storeCompletedJob(JobInProgress job) {
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+  }
+
+  public JobProfile getJobProfile(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getProfile();
+      } 
     }
+    return completedJobStatusStore.readJobProfile(jobid);
   }
-  public synchronized JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
     }
-    
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getStatus();
-    } else {
-      return completedJobStatusStore.readJobStatus(jobid);
-    }
-  }
-  public synchronized Counters getJobCounters(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getCounters();
-    } else {
-      return completedJobStatusStore.readCounters(jobid);
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getStatus();
+      } 
+    }
+    return completedJobStatusStore.readJobStatus(jobid);
+  }
+  public Counters getJobCounters(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getCounters();
+      } 
     }
+    return completedJobStatusStore.readCounters(jobid);
   }
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
@@ -2981,18 +2985,17 @@
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = EMPTY_EVENTS;
-
-    JobInProgress job = this.jobs.get(jobid);
-    if (null != job) {
-      if (job.inited()) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.inited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
       }
     }
-    else {
-      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
-    }
-    return events;
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=746902&r1=746901&r2=746902&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Feb 23 07:13:32 2009
@@ -1000,9 +1000,7 @@
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue(originalConf);
-    directoryCleanupThread.setDaemon(true);
-    directoryCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue();
   }
   
   /**
@@ -1450,7 +1448,7 @@
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2513,17 +2511,20 @@
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  
           }
@@ -3052,43 +3053,6 @@
     return paths;
   }
 
-  // cleanup queue which deletes files/directories of the paths queued up.
-  private static class CleanupQueue extends Thread {
-    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
-    private JobConf conf;
-    
-    public CleanupQueue(JobConf conf) throws IOException{
-      setName("Directory/File cleanup thread");
-      setDaemon(true);
-      this.conf = conf;
-    }
-
-    public void addToQueue(Path... paths) {
-      for (Path p : paths) {
-        try {
-          queue.put(p);
-        } catch (InterruptedException ie) {}
-      }
-      return;
-    }
-
-    public void run() {
-      LOG.debug("cleanup thread started");
-      Path path = null;
-      while (true) {
-        try {
-          path = queue.take();
-          // delete the path.
-          FileSystem fs = path.getFileSystem(conf);
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.info("Error deleting path" + path);
-        } catch (InterruptedException t) {
-        }
-      }
-    }
-  }
-
   int getMaxCurrentMapTasks() {
     return maxCurrentMapTasks;
   }