You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/15 23:35:52 UTC

svn commit: r487697 [2/2] - in /lucene/hadoop/branches/branch-0.9: ./ conf/ site/ src/java/org/apache/hadoop/mapred/ src/site/src/documentation/content/xdocs/

Modified: lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=487697&r1=487696&r2=487697
==============================================================================
--- lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 15 14:35:51 2006
@@ -68,6 +68,9 @@
 
     Server taskReportServer = null;
     InterTrackerProtocol jobClient;
+    
+    // last heartbeat response recieved
+    short heartbeatResponseId = -1;
 
     StatusHttpServer server = null;
     
@@ -187,7 +190,7 @@
         }
       }
     }
-    
+
     static String getCacheSubdir() {
       return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
     }
@@ -451,15 +454,23 @@
               }
             }
 
-            if (!transmitHeartBeat()) {
+            // Send the heartbeat and process the jobtracker's directives
+            HeartbeatResponse heartbeatResponse = transmitHeartBeat();
+            TaskTrackerAction[] actions = heartbeatResponse.getActions();
+            LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
+                    heartbeatResponse.getResponseId() + " and " + 
+                    ((actions != null) ? actions.length : 0) + " actions");
+            
+            if (reinitTaskTracker(actions)) {
               return State.STALE;
             }
+            
             lastHeartbeat = now;
             justStarted = false;
 
-            checkForNewTasks();
+            checkAndStartNewTasks(actions);
             markUnresponsiveTasks();
-            closeCompletedTasks();
+            closeCompletedTasks(actions);
             killOverflowingTasks();
             
             //we've cleaned up, resume normal operation
@@ -491,56 +502,94 @@
      * @return false if the tracker was unknown
      * @throws IOException
      */
-    private boolean transmitHeartBeat() throws IOException {
+    private HeartbeatResponse transmitHeartBeat() throws IOException {
       //
       // Build the heartbeat information for the JobTracker
       //
-      List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+      List<TaskStatus> taskReports = 
+        new ArrayList<TaskStatus>(runningTasks.size());
       synchronized (this) {
-          for (TaskInProgress tip: runningTasks.values()) {
-              taskReports.add(tip.createStatus());
-          }
+        for (TaskInProgress tip: runningTasks.values()) {
+          taskReports.add(tip.createStatus());
+        }
       }
       TaskTrackerStatus status = 
         new TaskTrackerStatus(taskTrackerName, localHostname, 
-                              httpPort, taskReports, 
-                              failures); 
-
+                httpPort, taskReports, 
+                failures); 
+      
+      //
+      // Check if we should ask for a new Task
+      //
+      boolean askForNewTask = false; 
+      if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+              acceptNewTasks) {
+        checkLocalDirs(fConf.getLocalDirs());
+        
+        if (enoughFreeSpace(minSpaceStart)) {
+          askForNewTask = true;
+        }
+      }
+      
       //
       // Xmit the heartbeat
       //
+      HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
+              justStarted, askForNewTask, 
+              heartbeatResponseId);
+      heartbeatResponseId = heartbeatResponse.getResponseId();
       
-      int resultCode = jobClient.emitHeartbeat(status, justStarted);
       synchronized (this) {
-        for (TaskStatus taskStatus: taskReports) {
-            if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
-                if (taskStatus.getIsMap()) {
-                    mapTotal--;
-                } else {
-                    reduceTotal--;
-                }
-                myMetrics.completeTask();
-                runningTasks.remove(taskStatus.getTaskId());
+        for (TaskStatus taskStatus : taskReports) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            if (taskStatus.getIsMap()) {
+              mapTotal--;
+            } else {
+              reduceTotal--;
             }
+            myMetrics.completeTask();
+            runningTasks.remove(taskStatus.getTaskId());
+          }
         }
       }
-      return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+      return heartbeatResponse;
     }
 
     /**
+     * Check if the jobtracker directed a 'reset' of the tasktracker.
+     * 
+     * @param actions the directives of the jobtracker for the tasktracker.
+     * @return <code>true</code> if tasktracker is to be reset, 
+     *         <code>false</code> otherwise.
+     */
+    private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+      if (actions != null) {
+        for (TaskTrackerAction action : actions) {
+          if (action.getActionId() == 
+            TaskTrackerAction.ActionType.REINIT_TRACKER) {
+            LOG.info("Recieved RenitTrackerAction from JobTracker");
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+    
+    /**
      * Check to see if there are any new tasks that we should run.
      * @throws IOException
      */
-    private void checkForNewTasks() throws IOException {
-      //
-      // Check if we should ask for a new Task
-      //
-      if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
-          acceptNewTasks) {
-        checkLocalDirs(fConf.getLocalDirs());
-        
-        if (enoughFreeSpace(minSpaceStart)) {
-          Task t = jobClient.pollForNewTask(taskTrackerName);
+    private void checkAndStartNewTasks(TaskTrackerAction[] actions) 
+    throws IOException {
+      if (actions == null) {
+        return;
+      }
+      
+      for (TaskTrackerAction action : actions) {
+        if (action.getActionId() == 
+          TaskTrackerAction.ActionType.LAUNCH_TASK) {
+          Task t = ((LaunchTaskAction)(action)).getTask();
+          LOG.info("LaunchTaskAction: " + t.getTaskId());
           if (t != null) {
             startNewTask(t);
           }
@@ -573,24 +622,73 @@
      * Ask the JobTracker if there are any tasks that we should clean up,
      * either because we don't need them any more or because the job is done.
      */
-    private void closeCompletedTasks() throws IOException {
-      String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
-      if (toCloseIds != null) {
-        synchronized (this) {
-          for (int i = 0; i < toCloseIds.length; i++) {
-            TaskInProgress tip = tasks.get(toCloseIds[i]);
-            if (tip != null) {
-              // remove the task from running jobs, removing the job if 
-              // it is the last task
-              removeTaskFromJob(tip.getTask().getJobId(), tip);
-              tasksToCleanup.put(tip);
+    private void closeCompletedTasks(TaskTrackerAction[] actions) 
+    throws IOException {
+      if (actions == null) {
+        return;
+      }
+      
+      for (TaskTrackerAction action : actions) {
+        TaskTrackerAction.ActionType actionType = action.getActionId();
+        
+        if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
+          String jobId = ((KillJobAction)action).getJobId();
+          LOG.info("Received 'KillJobAction' for job: " + jobId);
+          synchronized (runningJobs) {
+            RunningJob rjob = runningJobs.get(jobId);
+            if (rjob == null) {
+              LOG.warn("Unknown job " + jobId + " being deleted.");
             } else {
-              LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
+              synchronized (rjob) {
+                int noJobTasks = rjob.tasks.size(); 
+                int taskCtr = 0;
+                
+                // Add this tips of this job to queue of tasks to be purged 
+                for (TaskInProgress tip : rjob.tasks) {
+                  // Purge the job files for the last element in rjob.tasks
+                  if (++taskCtr == noJobTasks) {
+                    tip.setPurgeJobFiles(true);
+                  }
+
+                  tasksToCleanup.put(tip);
+                }
+                
+                // Remove this job 
+                rjob.tasks.clear();
+                runningJobs.remove(jobId);
+              }
             }
           }
+        } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
+          String taskId = ((KillTaskAction)action).getTaskId();
+          LOG.info("Received KillTaskAction for task: " + taskId);
+          purgeTask(tasks.get(taskId), false);
         }
       }
     }
+    
+    /**
+     * Remove the tip and update all relevant state.
+     * 
+     * @param tip {@link TaskInProgress} to be removed.
+     * @param purgeJobFiles <code>true</code> if the job files are to be
+     *                      purged, <code>false</code> otherwise.
+     */
+    private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
+      if (tip != null) {
+        LOG.info("About to purge task: " + tip.getTask().getTaskId());
+        
+        // Cleanup the job files? 
+        tip.setPurgeJobFiles(purgeJobFiles);
+        
+        // Remove the task from running jobs, 
+        // removing the job if it's the last task
+        removeTaskFromJob(tip.getTask().getJobId(), tip);
+        
+        // Add this tip to queue of tasks to be purged 
+        tasksToCleanup.put(tip);
+      }
+    }
 
     /** Check if we're dangerously low on disk space
      * If so, kill jobs to free up space and make sure
@@ -822,6 +920,9 @@
         private boolean alwaysKeepTaskFiles;
         private TaskStatus taskStatus ; 
         private boolean keepJobFiles;
+        
+        /** Cleanup the job files when the job is complete (done/failed) */
+        private boolean purgeJobFiles = false;
 
         /**
          */
@@ -886,6 +987,10 @@
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
         }
         
+        public void setPurgeJobFiles(boolean purgeJobFiles) {
+          this.purgeJobFiles = purgeJobFiles;
+        }
+        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -1017,32 +1122,39 @@
          * We no longer need anything from this task, as the job has
          * finished.  If the task is still running, kill it (and clean up
          */
-        public synchronized void jobHasFinished() throws IOException {
-        	 
-            if (getRunState() == TaskStatus.State.RUNNING) {
+        public void jobHasFinished() throws IOException {
+          boolean killTask = false;  
+          synchronized(this){
+              killTask = (getRunState() == TaskStatus.State.RUNNING);
+              if (killTask) {
                 killAndCleanup(false);
-            } else {
-                cleanup();
-            }
-            if (keepJobFiles)
-              return;
-            
-            // Delete temp directory in case any task used PhasedFileSystem.
-            try{
-              String systemDir = task.getConf().get("mapred.system.dir");
-              Path taskTempDir = new Path(systemDir + "/" + 
-                  task.getJobId() + "/" + task.getTipId());
-              if( fs.exists(taskTempDir)){
-                fs.delete(taskTempDir) ;
               }
-            }catch(IOException e){
-              LOG.warn("Error in deleting reduce temporary output",e); 
+          }
+          if (!killTask) {
+            cleanup();
+          }
+          if (keepJobFiles)
+            return;
+              
+          synchronized(this){
+              // Delete temp directory in case any task used PhasedFileSystem.
+              try{
+                String systemDir = task.getConf().get("mapred.system.dir");
+                Path taskTempDir = new Path(systemDir + "/" + 
+                    task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
+                if( fs.exists(taskTempDir)){
+                  fs.delete(taskTempDir) ;
+                }
+              }catch(IOException e){
+                LOG.warn("Error in deleting reduce temporary output",e); 
+              }
+            }
+            // Delete the job directory for this  
+            // task if the job is done/failed
+            if (purgeJobFiles) {
+              this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                      JOBCACHE + Path.SEPARATOR +  task.getJobId());
             }
-            
-            // delete the job diretory for this task 
-            // since the job is done/failed
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
-                    JOBCACHE + Path.SEPARATOR +  task.getJobId());
         }
 
         /**
@@ -1090,6 +1202,9 @@
          * We no longer need anything from this task.  Either the 
          * controlling job is all done and the files have been copied
          * away, or the task failed and we don't need the remains.
+         * Any calls to cleanup should not lock the tip first.
+         * cleanup does the right thing- updates tasks in Tasktracker
+         * by locking tasktracker first and then locks the tip.
          */
         void cleanup() throws IOException {
             String taskId = task.getTaskId();

Modified: lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml?view=diff&rev=487697&r1=487696&r2=487697
==============================================================================
--- lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml (original)
+++ lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml Fri Dec 15 14:35:51 2006
@@ -15,6 +15,15 @@
       <title>News</title>
 
       <section>
+      <title>15 December, 2006: release 0.9.2 available</title>
+      <p>This fixes critical bugs in 0.9.1.  For details see the <a
+      href="http://tinyurl.com/ya8lfd">release notes</a>. The release can
+      be obtained from <a
+      href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
+      nearby mirror</a>.
+      </p> </section>
+
+      <section>
       <title>6 December, 2006: release 0.9.1 available</title>
       <p>This fixes critical bugs in 0.9.0.  For details see the <a
       href="http://tinyurl.com/y55d7p">release notes</a>. The release can