You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2007/10/17 06:36:22 UTC

svn commit: r585366 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java

Author: ddas
Date: Tue Oct 16 21:36:22 2007
New Revision: 585366

URL: http://svn.apache.org/viewvc?rev=585366&view=rev
Log:
HADOOP-2051. The TaskCommit thread should not die for exceptions other than the InterruptedException. This behavior is there for the other long running threads in the JobTracker. Contributed by Arun C Murthy

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=585366&r1=585365&r2=585366&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 16 21:36:22 2007
@@ -310,6 +310,10 @@
     HADOOP-2044. The namenode protects all lease manipulations using a 
     sortedLease lock.  (Dhruba Borthakur)
 
+    HADOOP-2051. The TaskCommit thread should not die for exceptions other
+    than the InterruptedException. This behavior is there for the other long
+    running threads in the JobTracker. (Arun C Murthy via ddas)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=585366&r1=585365&r2=585366&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 16 21:36:22 2007
@@ -790,7 +790,6 @@
       LOG.info("Stopping TaskCommit thread");
       this.taskCommitThread.interrupt();
       try {
-        this.taskCommitThread.interrupt();
         this.taskCommitThread.join();
       } catch (InterruptedException ex) {
         ex.printStackTrace();
@@ -1871,11 +1870,21 @@
     }
   }
 
-  public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
+  /**
+   * Add a job's completed task (either successful or failed/killed) to the 
+   * {@link TaskCommitQueue}. 
+   * @param j completed task (either successful or failed/killed)
+   */
+  void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
     ((TaskCommitQueue)taskCommitThread).addToQueue(j);
   }
-  //This thread takes care of things like moving outputs to their final
-  //locations & deleting temporary outputs
+  
+  /**
+   * A thread which does all of the {@link FileSystem}-related operations for
+   * tasks. It picks the next task in the queue, promotes outputs of 
+   * {@link TaskStatus.State#SUCCEEDED} tasks & discards outputs for 
+   * {@link TaskStatus.State#FAILED} or {@link TaskStatus.State#KILLED} tasks.
+   */
   private class TaskCommitQueue extends Thread {
     
     private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue = 
@@ -1898,109 +1907,116 @@
        
     public void run() {
       while (!isInterrupted()) {
-        JobInProgress.JobWithTaskContext j;
         try {
-          j = queue.take();
-        } catch (InterruptedException ie) {
-          return;
-        }
-        JobInProgress job = j.getJob();
-        TaskInProgress tip = j.getTIP();
-        String taskid = j.getTaskId();
-        JobTrackerMetrics metrics = j.getJobTrackerMetrics();
-        Task t;
-        TaskStatus status;
-        boolean isTipComplete = false;
-        TaskStatus.State state;
-        synchronized (JobTracker.this) {
-          synchronized (job) {
-            synchronized (tip) {
-              status = tip.getTaskStatus(taskid);
-              t = tip.getTaskObject(taskid);
-              state = status.getRunState();
-              isTipComplete = tip.isComplete();
+          JobInProgress.JobWithTaskContext j = queue.take();
+          JobInProgress job = j.getJob();
+          TaskInProgress tip = j.getTIP();
+          String taskid = j.getTaskId();
+          JobTrackerMetrics metrics = j.getJobTrackerMetrics();
+          Task t;
+          TaskStatus status;
+          boolean isTipComplete = false;
+          TaskStatus.State state;
+          synchronized (JobTracker.this) {
+            synchronized (job) {
+              synchronized (tip) {
+                status = tip.getTaskStatus(taskid);
+                t = tip.getTaskObject(taskid);
+                state = status.getRunState();
+                isTipComplete = tip.isComplete();
+              }
             }
           }
-        }
-        try {
-          //For COMMIT_PENDING tasks, we save the task output in the dfs
-          //as well as manipulate the JT datastructures to reflect a
-          //successful task. This guarantees that we don't declare a task
-          //as having succeeded until we have successfully completed the
-          //dfs operations.
-          //For failed tasks, we just do the dfs operations here. The
-          //datastructures updates is done earlier as soon as the failure
-          //is detected so that the JT can immediately schedule another
-          //attempt for that task.
-          if (state == TaskStatus.State.COMMIT_PENDING) {
-            if (!isTipComplete) {
-              t.saveTaskOutput();
-            }
-            synchronized (JobTracker.this) {
-              //do a check for the case where after the task went to
-              //COMMIT_PENDING, it was lost. So although we would have
-              //saved the task output, we cannot declare it a SUCCESS.
-              TaskStatus newStatus = null;
-              synchronized (job) {
-                synchronized (tip) {
-                  status = tip.getTaskStatus(taskid);
-                  if (!isTipComplete) {
-                    if (status.getRunState() != 
-                         TaskStatus.State.COMMIT_PENDING) {
-                      state = TaskStatus.State.KILLED;
+          try {
+            //For COMMIT_PENDING tasks, we save the task output in the dfs
+            //as well as manipulate the JT datastructures to reflect a
+            //successful task. This guarantees that we don't declare a task
+            //as having succeeded until we have successfully completed the
+            //dfs operations.
+            //For failed tasks, we just do the dfs operations here. The
+            //datastructures updates is done earlier as soon as the failure
+            //is detected so that the JT can immediately schedule another
+            //attempt for that task.
+            if (state == TaskStatus.State.COMMIT_PENDING) {
+              if (!isTipComplete) {
+                t.saveTaskOutput();
+              }
+              synchronized (JobTracker.this) {
+                //do a check for the case where after the task went to
+                //COMMIT_PENDING, it was lost. So although we would have
+                //saved the task output, we cannot declare it a SUCCESS.
+                TaskStatus newStatus = null;
+                synchronized (job) {
+                  synchronized (tip) {
+                    status = tip.getTaskStatus(taskid);
+                    if (!isTipComplete) {
+                      if (status.getRunState() != 
+                        TaskStatus.State.COMMIT_PENDING) {
+                        state = TaskStatus.State.KILLED;
+                      } else {
+                        state = TaskStatus.State.SUCCEEDED;
+                      }
                     } else {
-                      state = TaskStatus.State.SUCCEEDED;
-                    }
-                  } else {
-                    tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
-                                                        "TIP");
-                    state = TaskStatus.State.KILLED;
+                      tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
+                      "TIP");
+                      state = TaskStatus.State.KILLED;
 
+                    }
+                    //create new status if required. If the state changed from
+                    //COMMIT_PENDING to KILLED in the JobTracker, while we were
+                    //saving the output,the JT would have called updateTaskStatus
+                    //and we don't need to call it again
+                    if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
+                      newStatus = (TaskStatus)status.clone();
+                      newStatus.setRunState(state);
+                      newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+                    }
                   }
-                  //create new status if required. If the state changed from
-                  //COMMIT_PENDING to KILLED in the JobTracker, while we were
-                  //saving the output,the JT would have called updateTaskStatus
-                  //and we don't need to call it again
-                  if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
-                    newStatus = (TaskStatus)status.clone();
-                    newStatus.setRunState(state);
-                    newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+                  if (newStatus != null) {
+                    job.updateTaskStatus(tip, newStatus, metrics);
                   }
                 }
-                if (newStatus != null) {
-                  job.updateTaskStatus(tip, newStatus, metrics);
-                }
               }
             }
+          } catch (IOException ioe) {
+            // Oops! Failed to copy the task's output to its final place;
+            // fail the task!
+            state = TaskStatus.State.FAILED;
+            synchronized (JobTracker.this) {
+              job.failedTask(tip, status.getTaskId(), 
+                  "Failed to rename output with the exception: " + 
+                  StringUtils.stringifyException(ioe), 
+                  (tip.isMapTask() ? 
+                      TaskStatus.Phase.MAP : 
+                        TaskStatus.Phase.REDUCE), 
+                        TaskStatus.State.FAILED,  
+                        status.getTaskTracker(), null);
+            }
+            LOG.info("Failed to rename the output of " + status.getTaskId() + 
+                " with: " + StringUtils.stringifyException(ioe));
           }
-        } catch (IOException ioe) {
-          // Oops! Failed to copy the task's output to its final place;
-          // fail the task!
-          state = TaskStatus.State.FAILED;
-          synchronized (JobTracker.this) {
-            job.failedTask(tip, status.getTaskId(), 
-                     "Failed to rename output with the exception: " + 
-                     StringUtils.stringifyException(ioe), 
-                     (tip.isMapTask() ? 
-                         TaskStatus.Phase.MAP : 
-                         TaskStatus.Phase.REDUCE), 
-                     TaskStatus.State.FAILED,  
-                         status.getTaskTracker(), null);
+          if (state == TaskStatus.State.FAILED || 
+              state == TaskStatus.State.KILLED) {
+            try {
+              t.discardTaskOutput();
+            } catch (IOException ioe) { 
+              LOG.info("Failed to discard the output of task " + 
+                  status.getTaskId() + " with: " + 
+                  StringUtils.stringifyException(ioe));
+            }
           }
-          LOG.info("Failed to rename the output of " + status.getTaskId() + 
-                   " with: " + StringUtils.stringifyException(ioe));
+        } catch (InterruptedException ie) {
+          LOG.warn(getName() + " exiting, got interrupted: " + 
+                   StringUtils.stringifyException(ie));
+          return;
         }
-        if (state == TaskStatus.State.FAILED || 
-            state == TaskStatus.State.KILLED) {
-          try {
-            t.discardTaskOutput();
-          } catch (IOException ioe) { 
-            LOG.info("Failed to discard the output of task " + 
-                status.getTaskId() + " with: " + 
-                StringUtils.stringifyException(ioe));
-          }
+        catch (Throwable t) {
+          LOG.error(getName() + " got an exception: " +
+                    StringUtils.stringifyException(t));
         }
       }
+      
+      LOG.warn(getName() + " exiting..."); 
     }
   }