You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2018/02/16 14:16:10 UTC
hadoop git commit: MAPREDUCE-7053: Timed out tasks can fail to
produce thread dump. Contributed by Jason Lowe.
Repository: hadoop
Updated Branches:
refs/heads/trunk a1e05e029 -> 82f029f7b
MAPREDUCE-7053: Timed out tasks can fail to produce thread dump. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/82f029f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/82f029f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/82f029f7
Branch: refs/heads/trunk
Commit: 82f029f7b50679ea477a3a898e4ee400fa394adf
Parents: a1e05e0
Author: Eric Payne <ep...@apache.org>
Authored: Fri Feb 16 08:15:09 2018 -0600
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Feb 16 08:15:09 2018 -0600
----------------------------------------------------------------------
.../hadoop/mapred/TaskAttemptListenerImpl.java | 15 +++--
.../mapreduce/v2/app/TaskHeartbeatHandler.java | 67 ++++++++++++++------
.../mapred/TestTaskAttemptListenerImpl.java | 54 ++++++++++++++--
.../v2/app/TestTaskHeartbeatHandler.java | 38 +++++++++++
4 files changed, 145 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82f029f7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 668d8ed..b04dac5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -370,17 +370,22 @@ public class TaskAttemptListenerImpl extends CompositeService
TypeConverter.toYarn(taskAttemptID);
AMFeedback feedback = new AMFeedback();
+ feedback.setTaskFound(true);
+
AtomicReference<TaskAttemptStatus> lastStatusRef =
attemptIdToStatus.get(yarnAttemptID);
if (lastStatusRef == null) {
- LOG.error("Status update was called with illegal TaskAttemptId: "
- + yarnAttemptID);
- feedback.setTaskFound(false);
+ // The task is not known, but it could be in the process of tearing
+ // down gracefully or receiving a thread dump signal. Tolerate unknown
+ // tasks as long as they have unregistered recently.
+ if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) {
+ LOG.error("Status update was called with illegal TaskAttemptId: "
+ + yarnAttemptID);
+ feedback.setTaskFound(false);
+ }
return feedback;
}
- feedback.setTaskFound(true);
-
// Propagating preemption to the task if TASK_PREEMPTION is enabled
if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
&& preemptionPolicy.isPreempted(yarnAttemptID)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82f029f7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
index c438b35..f8f5015 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
@@ -71,12 +71,14 @@ public class TaskHeartbeatHandler extends AbstractService {
private Thread lostTaskCheckerThread;
private volatile boolean stopped;
private long taskTimeOut;
+ private long unregisterTimeOut;
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
private final EventHandler eventHandler;
private final Clock clock;
private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
+ private ConcurrentMap<TaskAttemptId, ReportTime> recentlyUnregisteredAttempts;
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
int numThreads) {
@@ -85,6 +87,8 @@ public class TaskHeartbeatHandler extends AbstractService {
this.clock = clock;
runningAttempts =
new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
+ recentlyUnregisteredAttempts =
+ new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
}
@Override
@@ -92,6 +96,8 @@ public class TaskHeartbeatHandler extends AbstractService {
super.serviceInit(conf);
taskTimeOut = conf.getLong(
MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
+ unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+ MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
// enforce task timeout is at least twice as long as task report interval
long taskProgressReportIntervalMillis = MRJobConfUtil.
@@ -140,6 +146,12 @@ public class TaskHeartbeatHandler extends AbstractService {
public void unregister(TaskAttemptId attemptID) {
runningAttempts.remove(attemptID);
+ recentlyUnregisteredAttempts.put(attemptID,
+ new ReportTime(clock.getTime()));
+ }
+
+ public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) {
+ return recentlyUnregisteredAttempts.containsKey(attemptID);
}
private class PingChecker implements Runnable {
@@ -147,27 +159,9 @@ public class TaskHeartbeatHandler extends AbstractService {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
- Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
- runningAttempts.entrySet().iterator();
-
- // avoid calculating current time everytime in loop
long currentTime = clock.getTime();
-
- while (iterator.hasNext()) {
- Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
- boolean taskTimedOut = (taskTimeOut > 0) &&
- (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
-
- if(taskTimedOut) {
- // task is lost, remove from the list and raise lost event
- iterator.remove();
- eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
- .getKey(), "AttemptID:" + entry.getKey().toString()
- + " Timed out after " + taskTimeOut / 1000 + " secs"));
- eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
- TaskAttemptEventType.TA_TIMED_OUT));
- }
- }
+ checkRunning(currentTime);
+ checkRecentlyUnregistered(currentTime);
try {
Thread.sleep(taskTimeOutCheckInterval);
} catch (InterruptedException e) {
@@ -176,6 +170,39 @@ public class TaskHeartbeatHandler extends AbstractService {
}
}
}
+
+ private void checkRunning(long currentTime) {
+ Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
+ runningAttempts.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+ boolean taskTimedOut = (taskTimeOut > 0) &&
+ (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
+
+ if(taskTimedOut) {
+ // task is lost, remove from the list and raise lost event
+ iterator.remove();
+ eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+ .getKey(), "AttemptID:" + entry.getKey().toString()
+ + " Timed out after " + taskTimeOut / 1000 + " secs"));
+ eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+ TaskAttemptEventType.TA_TIMED_OUT));
+ }
+ }
+ }
+
+ private void checkRecentlyUnregistered(long currentTime) {
+ Iterator<ReportTime> iterator =
+ recentlyUnregisteredAttempts.values().iterator();
+ while (iterator.hasNext()) {
+ ReportTime unregisteredTime = iterator.next();
+ if (currentTime >
+ unregisteredTime.getLastProgress() + unregisterTimeOut) {
+ iterator.remove();
+ }
+ }
+ }
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82f029f7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index da7421b..068ebfa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import com.google.common.base.Supplier;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
@@ -51,11 +52,13 @@ import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Test;
@@ -488,14 +491,57 @@ public class TestTaskAttemptListenerImpl {
}
@Test
- public void testStatusUpdateFromUnregisteredTask()
- throws IOException, InterruptedException{
+ public void testStatusUpdateFromUnregisteredTask() throws Exception {
configureMocks();
- startListener(false);
+ ControlledClock clock = new ControlledClock();
+ clock.setTime(0);
+ doReturn(clock).when(appCtx).getClock();
+
+ final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
+ secret, rmHeartbeatHandler, policy) {
+ @Override
+ protected void startRpcServer() {
+ // Empty
+ }
+ @Override
+ protected void stopRpcServer() {
+ // Empty
+ }
+ };
- AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus);
+ Configuration conf = new Configuration();
+ conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+ tal.init(conf);
+ tal.start();
+ AMFeedback feedback = tal.statusUpdate(attemptID, firstReduceStatus);
assertFalse(feedback.getTaskFound());
+ tal.registerPendingTask(task, wid);
+ tal.registerLaunchedTask(attemptId, wid);
+ feedback = tal.statusUpdate(attemptID, firstReduceStatus);
+ assertTrue(feedback.getTaskFound());
+
+ // verify attempt is still reported as found if recently unregistered
+ tal.unregister(attemptId, wid);
+ feedback = tal.statusUpdate(attemptID, firstReduceStatus);
+ assertTrue(feedback.getTaskFound());
+
+ // verify attempt is not found if not recently unregistered
+ long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+ MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+ clock.setTime(unregisterTimeout + 1);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ AMFeedback response =
+ tal.statusUpdate(attemptID, firstReduceStatus);
+ return !response.getTaskFound();
+ } catch (Exception e) {
+ throw new RuntimeException("status update failed", e);
+ }
+ }
+ }, 10, 10000);
}
private void configureMocks() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82f029f7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
index 2623849..5d86479 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -30,10 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Test;
@@ -105,6 +108,41 @@ public class TestTaskHeartbeatHandler {
verifyTaskTimeoutConfig(conf, expectedTimeout);
}
+ @Test
+ public void testTaskUnregistered() throws Exception {
+ EventHandler mockHandler = mock(EventHandler.class);
+ ControlledClock clock = new ControlledClock();
+ clock.setTime(0);
+ final TaskHeartbeatHandler hb =
+ new TaskHeartbeatHandler(mockHandler, clock, 1);
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+ hb.init(conf);
+ hb.start();
+ try {
+ ApplicationId appId = ApplicationId.newInstance(0l, 5);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 4);
+ TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
+ final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
+ Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
+ hb.register(taid);
+ Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
+ hb.unregister(taid);
+ Assert.assertTrue(hb.hasRecentlyUnregistered(taid));
+ long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+ MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+ clock.setTime(unregisterTimeout + 1);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return !hb.hasRecentlyUnregistered(taid);
+ }
+ }, 10, 10000);
+ } finally {
+ hb.stop();
+ }
+ }
+
/**
* Test if task timeout is set properly in response to the configuration of
* the task progress report interval.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org