You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:01:23 UTC

svn commit: r1079260 - /hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java

Author: omalley
Date: Tue Mar  8 06:01:23 2011
New Revision: 1079260

URL: http://svn.apache.org/viewvc?rev=1079260&view=rev
Log:
commit 5f301c95d923542549e4727e580b490b47f1b157
Author: Rajesh Balamohan <rb...@yahoo-inc.com>
Date:   Thu Feb 3 18:07:26 2011 +0530

     - Runtime variations in PigMix scripts due to timeouts in maptasks

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079260&r1=1079259&r2=1079260&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 06:01:23 2011
@@ -550,6 +550,8 @@ abstract public class Task implements Wr
     private InputSplit split = null;
     private Progress taskProgress;
     private Thread pingThread = null;
+    private boolean done = true;
+    private Object lock = new Object();
 
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -643,6 +645,9 @@ abstract public class Task implements Wr
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
       while (!taskDone.get()) {
+        synchronized(lock) {
+          done = false;
+        }
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
@@ -675,6 +680,7 @@ abstract public class Task implements Wr
           // died and came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
+            resetDoneFlag();
             System.exit(66);
           }
 
@@ -687,11 +693,22 @@ abstract public class Task implements Wr
           if (remainingRetries == 0) {
             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
             LOG.warn("Last retry, killing "+taskId);
+            resetDoneFlag();
             System.exit(65);
           }
         }
       }
+      //Notify that we are done with the work
+      resetDoneFlag();
+    }
+
+    void resetDoneFlag() {
+      synchronized(lock) {
+        done = true;
+        lock.notify();
+      }
     }
+
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -701,6 +718,11 @@ abstract public class Task implements Wr
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
+        synchronized(lock) {
+          while(!done) {
+            lock.wait();
+          }
+        }
         pingThread.interrupt();
         pingThread.join();
       }