You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:54:53 UTC
svn commit: r1077782 -
/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
Author: omalley
Date: Fri Mar 4 04:54:53 2011
New Revision: 1077782
URL: http://svn.apache.org/viewvc?rev=1077782&view=rev
Log:
commit 86e9c728e5f669b4337798d83cfba829b314cc36
Author: Rajesh Balamohan <rb...@yahoo-inc.com>
Date: Thu Feb 3 07:38:01 2011 +0530
- Runtime variations in PigMix scripts due to timeouts in maptasks
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077782&r1=1077781&r2=1077782&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 04:54:53 2011
@@ -511,6 +511,8 @@ abstract public class Task implements Wr
private Progress taskProgress;
private Thread pingThread = null;
private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
+ private boolean done = true;
+ private Object lock = new Object();
/**
* flag that indicates whether progress update needs to be sent to parent.
@@ -605,6 +607,9 @@ abstract public class Task implements Wr
// get current flag value and reset it as well
boolean sendProgress = resetProgressFlag();
while (!taskDone.get()) {
+ synchronized(lock) {
+ done = false;
+ }
try {
boolean taskFound = true; // whether TT knows about this task
// sleep for a bit
@@ -637,6 +642,7 @@ abstract public class Task implements Wr
// came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
+ resetDoneFlag();
System.exit(66);
}
@@ -649,11 +655,22 @@ abstract public class Task implements Wr
if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
LOG.warn("Last retry, killing "+taskId);
+ resetDoneFlag();
System.exit(65);
}
}
}
+ //Notify that we are done with the work
+ resetDoneFlag();
+ }
+
+ void resetDoneFlag() {
+ synchronized(lock) {
+ done = true;
+ lock.notify();
+ }
}
+
public void startCommunicationThread() {
if (pingThread == null) {
pingThread = new Thread(this, "communication thread");
@@ -663,6 +680,11 @@ abstract public class Task implements Wr
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {
+ synchronized(lock) {
+ while(!done) {
+ lock.wait();
+ }
+ }
pingThread.interrupt();
pingThread.join();
}