You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:54:53 UTC

svn commit: r1077782 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java

Author: omalley
Date: Fri Mar  4 04:54:53 2011
New Revision: 1077782

URL: http://svn.apache.org/viewvc?rev=1077782&view=rev
Log:
commit 86e9c728e5f669b4337798d83cfba829b314cc36
Author: Rajesh Balamohan <rb...@yahoo-inc.com>
Date:   Thu Feb 3 07:38:01 2011 +0530

     - Runtime variations in PigMix scripts due to timeouts in maptasks

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077782&r1=1077781&r2=1077782&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 04:54:53 2011
@@ -511,6 +511,8 @@ abstract public class Task implements Wr
     private Progress taskProgress;
     private Thread pingThread = null;
     private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
+    private boolean done = true;
+    private Object lock = new Object();
     
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -605,6 +607,9 @@ abstract public class Task implements Wr
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
       while (!taskDone.get()) {
+        synchronized(lock) {
+          done = false;
+        }
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
@@ -637,6 +642,7 @@ abstract public class Task implements Wr
           // came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
+            resetDoneFlag();
             System.exit(66);
           }
 
@@ -649,11 +655,22 @@ abstract public class Task implements Wr
           if (remainingRetries == 0) {
             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
             LOG.warn("Last retry, killing "+taskId);
+            resetDoneFlag();
             System.exit(65);
           }
         }
       }
+      //Notify that we are done with the work
+      resetDoneFlag();
+    }
+
+    void resetDoneFlag() {
+      synchronized(lock) {
+        done = true;
+        lock.notify();
+      }
     }
+
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -663,6 +680,11 @@ abstract public class Task implements Wr
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
+        synchronized(lock) {
+          while(!done) {
+            lock.wait();
+          }
+        }
         pingThread.interrupt();
         pingThread.join();
       }