You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:28:59 UTC
[20/37] hadoop git commit: MAPREDUCE-7022. Fast fail rogue jobs based
on task scratch dir size. Contributed by Johan Gustavsson
MAPREDUCE-7022. Fast fail rogue jobs based on task scratch dir size. Contributed by Johan Gustavsson
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a37e7f0a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a37e7f0a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a37e7f0a
Branch: refs/heads/HDFS-7240
Commit: a37e7f0ad8b68c7ed16c242bedf62f4cde48d6fd
Parents: 1b0f265
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jan 26 14:36:45 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Jan 26 14:36:45 2018 -0600
----------------------------------------------------------------------
.../hadoop/mapred/LocalContainerLauncher.java | 2 +-
.../hadoop/mapred/TaskAttemptListenerImpl.java | 7 +-
.../org/apache/hadoop/mapred/YarnChild.java | 4 +-
.../v2/app/job/event/TaskAttemptFailEvent.java | 53 ++++++++++++
.../app/job/event/TaskTAttemptFailedEvent.java | 39 +++++++++
.../v2/app/job/impl/TaskAttemptImpl.java | 40 ++++++---
.../mapreduce/v2/app/job/impl/TaskImpl.java | 6 +-
.../hadoop/mapreduce/v2/app/TestFail.java | 7 +-
.../hadoop/mapreduce/v2/app/TestRecovery.java | 7 +-
.../mapreduce/v2/app/job/impl/TestJobImpl.java | 5 +-
.../v2/app/job/impl/TestTaskAttempt.java | 9 +-
.../mapreduce/v2/app/job/impl/TestTaskImpl.java | 42 ++++-----
.../apache/hadoop/mapred/LocalJobRunner.java | 4 +-
.../java/org/apache/hadoop/mapred/MapTask.java | 3 +-
.../java/org/apache/hadoop/mapred/Task.java | 87 ++++++++++++++++++-
.../hadoop/mapred/TaskUmbilicalProtocol.java | 12 ++-
.../apache/hadoop/mapreduce/MRJobConfig.java | 14 +++
.../src/main/resources/mapred-default.xml | 22 +++++
.../hadoop/mapred/TestTaskProgressReporter.java | 90 +++++++++++++++++++-
.../mapreduce/v2/hs/TestJobHistoryParsing.java | 9 +-
.../apache/hadoop/mapred/TestMapProgress.java | 4 +-
.../apache/hadoop/mapred/TestTaskCommit.java | 2 +-
22 files changed, 397 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 6f9cc34..fed500a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -510,7 +510,7 @@ public class LocalContainerLauncher extends AbstractService implements
String cause =
(tCause == null) ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
- umbilical.fatalError(classicAttemptID, cause);
+ umbilical.fatalError(classicAttemptID, cause, false);
}
throw new RuntimeException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 556c90c..b155af22 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -281,7 +282,7 @@ public class TaskAttemptListenerImpl extends CompositeService
}
@Override
- public void fatalError(TaskAttemptID taskAttemptID, String msg)
+ public void fatalError(TaskAttemptID taskAttemptID, String msg, boolean fastFail)
throws IOException {
// This happens only in Child and in the Task.
LOG.error("Task: " + taskAttemptID + " - exited : " + msg);
@@ -294,7 +295,7 @@ public class TaskAttemptListenerImpl extends CompositeService
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID, fastFail));
}
@Override
@@ -312,7 +313,7 @@ public class TaskAttemptListenerImpl extends CompositeService
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index 7ae7a1e..bd40e54 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -206,7 +206,7 @@ class YarnChild {
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid,
- StringUtils.stringifyException(exception));
+ StringUtils.stringifyException(exception), false);
}
}
} catch (Throwable throwable) {
@@ -218,7 +218,7 @@ class YarnChild {
String cause =
tCause == null ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
- umbilical.fatalError(taskid, cause);
+ umbilical.fatalError(taskid, cause, false);
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
new file mode 100644
index 0000000..6ea1d15
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptFailEvent extends TaskAttemptEvent {
+ private boolean fastFail;
+
+ /**
+ * Create a new TaskAttemptFailEvent, with task fastFail disabled.
+ *
+ * @param id the id of the task attempt
+ */
+ public TaskAttemptFailEvent(TaskAttemptId id) {
+ this(id, false);
+ }
+
+ /**
+ * Create a new TaskAttemptFailEvent.
+ *
+ * @param id the id of the task attempt
+ * @param fastFail should the task fastFail or not.
+ */
+ public TaskAttemptFailEvent(TaskAttemptId id, boolean fastFail) {
+ super(id, TaskAttemptEventType.TA_FAILMSG);
+ this.fastFail = fastFail;
+ }
+
+ /**
+ * Check if task should fast fail or retry
+ * @return boolean value where true indicates the task should not retry
+ */
+ public boolean isFastFail() {
+ return fastFail;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
new file mode 100644
index 0000000..30392ac
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskTAttemptFailedEvent extends TaskTAttemptEvent {
+
+ private boolean fastFail;
+
+ public TaskTAttemptFailedEvent(TaskAttemptId id) {
+ this(id, false);
+ }
+
+ public TaskTAttemptFailedEvent(TaskAttemptId id, boolean fastFail) {
+ super(id, TaskEventType.T_ATTEMPT_FAILED);
+ this.fastFail = fastFail;
+ }
+
+ public boolean isFastFail() {
+ return fastFail;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 431128b..6632f27 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
@@ -101,6 +102,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
@@ -194,6 +196,7 @@ public abstract class TaskAttemptImpl implements
private Locality locality;
private Avataar avataar;
private boolean rescheduleNextAttempt = false;
+ private boolean failFast = false;
private static final CleanupContainerTransition
CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
@@ -1412,6 +1415,14 @@ public abstract class TaskAttemptImpl implements
public void setAvataar(Avataar avataar) {
this.avataar = avataar;
}
+
+ public void setTaskFailFast(boolean failFast) {
+ this.failFast = failFast;
+ }
+
+ public boolean isTaskFailFast() {
+ return failFast;
+ }
@SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
@@ -1921,9 +1932,12 @@ public abstract class TaskAttemptImpl implements
switch(finalState) {
case FAILED:
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId,
- TaskEventType.T_ATTEMPT_FAILED));
+ boolean fastFail = false;
+ if (event instanceof TaskAttemptFailEvent) {
+ fastFail = ((TaskAttemptFailEvent) event).isFastFail();
+ }
+ taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+ taskAttempt.attemptId, fastFail));
break;
case KILLED:
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
@@ -2041,13 +2055,16 @@ public abstract class TaskAttemptImpl implements
private static class FailedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// set the finish time
taskAttempt.setFinishTime();
- notifyTaskAttemptFailed(taskAttempt);
+
+ notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast());
}
}
@@ -2154,8 +2171,8 @@ public abstract class TaskAttemptImpl implements
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+ taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+ taskAttempt.attemptId));
}
}
@@ -2332,6 +2349,8 @@ public abstract class TaskAttemptImpl implements
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
+ } else if (event instanceof TaskAttemptFailEvent) {
+ taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail());
}
}
}
@@ -2400,12 +2419,13 @@ public abstract class TaskAttemptImpl implements
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
- notifyTaskAttemptFailed(taskAttempt);
+ notifyTaskAttemptFailed(taskAttempt, false);
}
}
@SuppressWarnings("unchecked")
- private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+ private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt,
+ boolean fastFail) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
@@ -2419,8 +2439,8 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+ taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+ taskAttempt.attemptId, fastFail));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index 086d4d5..ce3b3cc 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -1054,7 +1055,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ TaskTAttemptFailedEvent castEvent = (TaskTAttemptFailedEvent) event;
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
task.failedAttempts.add(taskAttemptId);
if (taskAttemptId.equals(task.commitAttempt)) {
@@ -1068,7 +1069,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
task.finishedAttempts.add(taskAttemptId);
- if (task.failedAttempts.size() < task.maxAttempts) {
+ if (!castEvent.isFastFail()
+ && task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 4d3f6f4..a2f0aba 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -288,8 +289,7 @@ public class TestFail {
if (attemptID.getTaskId().getId() == 0) {//check if it is first task
// send the Fail event
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID,
- TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
@@ -310,8 +310,7 @@ public class TestFail {
//check if it is first task's first attempt
// send the Fail event
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID,
- TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 893c4a0..b2807c1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -38,6 +38,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -167,9 +169,8 @@ public class TestRecovery {
/////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
- new TaskAttemptEvent(
- task1Attempt1.getID(),
- TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(
+ task1Attempt1.getID()));
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1827ce4..8592b20 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -437,8 +437,7 @@ public class TestJobImpl {
TaskImpl task = (TaskImpl) t;
task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
for(TaskAttempt ta: task.getAttempts().values()) {
- task.handle(new TaskTAttemptEvent(ta.getID(),
- TaskEventType.T_ATTEMPT_FAILED));
+ task.handle(new TaskTAttemptFailedEvent(ta.getID()));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index fe5d95d..43571a9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -499,7 +500,7 @@ public class TestTaskAttempt{
new TaskAttemptDiagnosticsUpdateEvent(attemptID,
"Test Diagnostic Event"));
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
@@ -1357,8 +1358,7 @@ public class TestTaskAttempt{
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
- taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
- TaskAttemptEventType.TA_FAILMSG));
+ taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
@@ -1484,8 +1484,7 @@ public class TestTaskAttempt{
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
- taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
- TaskAttemptEventType.TA_FAILMSG));
+ taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.FAILED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
index 62d4cc0..1225c43 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
@@ -345,8 +346,7 @@ public class TestTaskImpl {
}
private void failRunningTaskAttempt(TaskAttemptId attemptId) {
- mockTask.handle(new TaskTAttemptEvent(attemptId,
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(attemptId));
assertTaskRunningState();
}
@@ -612,11 +612,16 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
-
+
// Now complete the first task attempt, after the second has succeeded
- mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
- firstAttemptFinishEvent));
-
+ if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) {
+ mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts
+ .get(0).getAttemptId()));
+ } else {
+ mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
+ firstAttemptFinishEvent));
+ }
+
// The task should still be in the succeeded state
assertTaskSucceededState();
@@ -668,8 +673,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
- mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(
+ taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@@ -683,8 +688,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
- mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(
+ taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@@ -698,8 +703,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
- mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(
+ taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@@ -734,8 +739,8 @@ public class TestTaskImpl {
// have the first attempt fail, verify task failed due to no retries
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
- mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(
+ taskAttempt.getAttemptId()));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify task can no longer be killed
@@ -757,8 +762,7 @@ public class TestTaskImpl {
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt.setState(TaskAttemptState.FAILED);
- mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(2);
taskAttempt.setState(TaskAttemptState.SUCCEEDED);
@@ -808,8 +812,7 @@ public class TestTaskImpl {
// max attempts is 4
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
- mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify a new attempt(#3) added because the speculative attempt(#2)
@@ -829,8 +832,7 @@ public class TestTaskImpl {
// hasn't reach the max attempts which is 4
MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1);
taskAttempt1.setState(TaskAttemptState.FAILED);
- mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId()));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify there's no new attempt added because of the running attempt(#3)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index c9dff6a..5e7a250 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -729,9 +729,9 @@ public class LocalJobRunner implements ClientProtocol {
LOG.error("shuffleError: "+ message + "from task: " + taskId);
}
- public synchronized void fatalError(TaskAttemptID taskId, String msg)
+ public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail)
throws IOException {
- LOG.error("Fatal: "+ msg + "from task: " + taskId);
+ LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 27c8976..ab7cba5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -1568,7 +1568,8 @@ public class MapTask extends Task {
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
- mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
+ mapTask.reportFatalError(getTaskID(), lspillException, logMsg,
+ false);
}
throw new IOException("Spill failed", lspillException);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 730f4ee..87c9e16 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -354,7 +355,7 @@ abstract public class Task implements Writable, Configurable {
* Report a fatal error to the parent (task) tracker.
*/
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
- String logMsg) {
+ String logMsg, boolean fastFail) {
LOG.error(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
@@ -366,7 +367,7 @@ abstract public class Task implements Writable, Configurable {
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
- umbilical.fatalError(id, cause);
+ umbilical.fatalError(id, cause, fastFail);
} catch (IOException ioe) {
LOG.error("Failed to contact the tasktracker", ioe);
System.exit(-1);
@@ -652,6 +653,8 @@ abstract public class Task implements Writable, Configurable {
private Thread pingThread = null;
private boolean done = true;
private Object lock = new Object();
+ private volatile String diskLimitCheckStatus = null;
+ private Thread diskLimitCheckThread = null;
/**
* flag that indicates whether progress update needs to be sent to parent.
@@ -749,6 +752,65 @@ abstract public class Task implements Writable, Configurable {
}
/**
+ * disk limit checker, runs in separate thread when activated.
+ */
+ public class DiskLimitCheck implements Runnable {
+ private LocalFileSystem localFS;
+ private long fsLimit;
+ private long checkInterval;
+ private String[] localDirs;
+ private boolean killOnLimitExceeded;
+
+ public DiskLimitCheck(JobConf conf) throws IOException {
+ this.localFS = FileSystem.getLocal(conf);
+ this.fsLimit = conf.getLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
+ MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES);
+ this.localDirs = conf.getLocalDirs();
+ this.checkInterval = conf.getLong(
+ MRJobConfig.JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS,
+ MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS);
+ this.killOnLimitExceeded = conf.getBoolean(
+ MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+ MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED);
+ }
+
+ @Override
+ public void run() {
+ while (!taskDone.get()) {
+ try {
+ long localWritesSize = 0L;
+ String largestWorkDir = null;
+ for (String local : localDirs) {
+ long size = FileUtil.getDU(localFS.pathToFile(new Path(local)));
+ if (localWritesSize < size) {
+ localWritesSize = size;
+ largestWorkDir = local;
+ }
+ }
+ if (localWritesSize > fsLimit) {
+ String localStatus =
+ "too much data in local scratch dir="
+ + largestWorkDir
+ + ". current size is "
+ + localWritesSize
+ + " the limit is " + fsLimit;
+ if (killOnLimitExceeded) {
+ LOG.error(localStatus);
+ diskLimitCheckStatus = localStatus;
+ } else {
+ LOG.warn(localStatus);
+ }
+ break;
+ }
+ Thread.sleep(checkInterval);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /**
* check the counters to see whether the task has exceeded any configured
* limits.
* @throws TaskLimitException
@@ -773,6 +835,9 @@ abstract public class Task implements Writable, Configurable {
" the limit is " + limit);
}
}
+ if (diskLimitCheckStatus != null) {
+ throw new TaskLimitException(diskLimitCheckStatus);
+ }
}
/**
@@ -851,7 +916,7 @@ abstract public class Task implements Writable, Configurable {
StringUtils.stringifyException(e);
LOG.error(errMsg);
try {
- umbilical.fatalError(taskId, errMsg);
+ umbilical.fatalError(taskId, errMsg, true);
} catch (IOException ioe) {
LOG.error("Failed to update failure diagnosis", ioe);
}
@@ -884,6 +949,22 @@ abstract public class Task implements Writable, Configurable {
pingThread.setDaemon(true);
pingThread.start();
}
+ startDiskLimitCheckerThreadIfNeeded();
+ }
+ public void startDiskLimitCheckerThreadIfNeeded() {
+ if (diskLimitCheckThread == null && conf.getLong(
+ MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
+ MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES) >= 0) {
+ try {
+ diskLimitCheckThread = new Thread(new DiskLimitCheck(conf),
+ "disk limit check thread");
+ diskLimitCheckThread.setDaemon(true);
+ diskLimitCheckThread.start();
+ } catch (IOException e) {
+ LOG.error("Issues starting disk monitor thread: "
+ + e.getMessage(), e);
+ }
+ }
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
index c3678d6..041ab39 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
@@ -68,9 +68,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
* Version 20 Added methods to manage checkpoints
+ * Version 21 Added fastFail parameter to fatalError
* */
- public static final long versionID = 20L;
+ public static final long versionID = 21L;
/**
* Called when a child task process starts, to get its task.
@@ -140,8 +141,13 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
/** Report that the task encounted a local filesystem error.*/
void fsError(TaskAttemptID taskId, String message) throws IOException;
- /** Report that the task encounted a fatal error.*/
- void fatalError(TaskAttemptID taskId, String message) throws IOException;
+ /**
+ * Report that the task encounted a fatal error.
+ * @param taskId task's id
+ * @param message fail message
+ * @param fastFail flag to enable fast fail for task
+ */
+ void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException;
/** Called by a reduce task to get the map output locations for finished maps.
* Returns an update centered around the map-task-completion-events.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 6acf1bc..ca18bfe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -52,6 +52,20 @@ public interface MRJobConfig {
public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+ public static final String JOB_SINGLE_DISK_LIMIT_BYTES =
+ "mapreduce.job.local-fs.single-disk-limit.bytes";
+ // negative values disable the limit
+ public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
+
+ public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
+ "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
+ // setting to false only logs the kill
+ public static final boolean DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = true;
+
+ public static final String JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS =
+ "mapreduce.job.local-fs.single-disk-limit.check.interval-ms";
+ public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = 5000;
+
public static final String TASK_LOCAL_WRITE_LIMIT_BYTES =
"mapreduce.task.local-fs.write-limit.bytes";
// negative values disable the limit
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 62f3dfa..72f509c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -63,6 +63,28 @@
</property>
<property>
+ <name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
+ <value>-1</value>
+ <description>Enable an in task monitor thread to watch for single disk
+ consumption by jobs. By setting this to x nr of bytes, the task will fast
+ fail in case it is reached. This is a per disk configuration.</description>
+</property>
+
+<property>
+ <name>mapreduce.job.local-fs.single-disk-limit.check.interval-ms</name>
+ <value>5000</value>
+ <description>Interval of disk limit check to run in ms.</description>
+</property>
+
+<property>
+ <name>mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed</name>
+ <value>true</value>
+ <description>If mapreduce.job.local-fs.single-disk-limit.bytes is triggered
+ should the task be killed or logged. If false the intent to kill the task
+ is only logged in the container logs.</description>
+</property>
+
+<property>
<name>mapreduce.job.maps</name>
<value>2</value>
<description>The default number of map tasks per job.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
index 18442d6..e5ff64e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
@@ -18,15 +18,19 @@
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import java.util.Random;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.util.ExitUtil;
@@ -43,6 +47,11 @@ public class TestTaskProgressReporter {
private FakeUmbilical fakeUmbilical = new FakeUmbilical();
+ private static final String TEST_DIR =
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")) + "/" +
+ TestTaskProgressReporter.class.getName();
+
private static class DummyTask extends Task {
@Override
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
@@ -53,6 +62,11 @@ public class TestTaskProgressReporter {
public boolean isMapTask() {
return true;
}
+
+ @Override
+ public boolean isCommitRequired() {
+ return false;
+ }
}
private static class FakeUmbilical implements TaskUmbilicalProtocol {
@@ -118,7 +132,7 @@ public class TestTaskProgressReporter {
}
@Override
- public void fatalError(TaskAttemptID taskId, String message)
+ public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
throws IOException {
}
@@ -163,6 +177,78 @@ public class TestTaskProgressReporter {
}
}
+ @Test(timeout=60000)
+ public void testScratchDirSize() throws Exception {
+ String tmpPath = TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
+ + new Random(System.currentTimeMillis()).nextInt();
+ File data = new File(tmpPath + "/out");
+ File testDir = new File(tmpPath);
+ testDir.mkdirs();
+ testDir.deleteOnExit();
+ JobConf conf = new JobConf();
+ conf.setStrings(MRConfig.LOCAL_DIR, "file://" + tmpPath);
+ conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, 1024L);
+ conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+ true);
+ getBaseConfAndWriteToFile(-1, data);
+ testScratchDirLimit(false, conf);
+ data.delete();
+ getBaseConfAndWriteToFile(100, data);
+ testScratchDirLimit(false, conf);
+ data.delete();
+ getBaseConfAndWriteToFile(1536, data);
+ testScratchDirLimit(true, conf);
+ conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+ false);
+ testScratchDirLimit(false, conf);
+ conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+ true);
+ conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, -1L);
+ testScratchDirLimit(false, conf);
+ data.delete();
+ FileUtil.fullyDelete(testDir);
+ }
+
+ private void getBaseConfAndWriteToFile(int size, File data)
+ throws IOException {
+ if (size > 0) {
+ byte[] b = new byte[size];
+ for (int i = 0; i < size; i++) {
+ b[i] = 1;
+ }
+ FileUtils.writeByteArrayToFile(data, b);
+ }
+ }
+
+ public void testScratchDirLimit(boolean fastFail, JobConf conf)
+ throws Exception {
+ ExitUtil.disableSystemExit();
+ threadExited = false;
+ Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread th, Throwable ex) {
+ if (ex instanceof ExitUtil.ExitException) {
+ threadExited = true;
+ th.interrupt();
+ }
+ }
+ };
+ Task task = new DummyTask();
+ task.setConf(conf);
+ DummyTaskReporter reporter = new DummyTaskReporter(task);
+ reporter.startDiskLimitCheckerThreadIfNeeded();
+ Thread t = new Thread(reporter);
+ t.setUncaughtExceptionHandler(h);
+ reporter.setProgressFlag();
+ t.start();
+ while (!reporter.taskLimitIsChecked) {
+ Thread.yield();
+ }
+ task.done(fakeUmbilical, reporter);
+ reporter.resetDoneFlag();
+ t.join(1000L);
+ Assert.assertEquals(fastFail, threadExited);
+ }
+
@Test (timeout=10000)
public void testTaskProgress() throws Exception {
JobConf job = new JobConf();
@@ -214,7 +300,7 @@ public class TestTaskProgressReporter {
conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit);
LocalFileSystem localFS = FileSystem.getLocal(conf);
- Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-"
+ Path tmpPath = new Path(TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
+ new Random(System.currentTimeMillis()).nextInt());
FSDataOutputStream out = localFS.create(tmpPath, true);
out.write(new byte[LOCAL_BYTES_WRITTEN]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
index 83e35fe..7b70f98 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -712,7 +713,7 @@ public class TestJobHistoryParsing {
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -732,7 +733,7 @@ public class TestJobHistoryParsing {
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -760,10 +761,10 @@ public class TestJobHistoryParsing {
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else if (taskType == TaskType.MAP && taskId == 1) {
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 0) {
getContext().getEventHandler().handle(
- new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 1) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
index f364c18..9b6ebda 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
@@ -91,8 +91,8 @@ public class TestMapProgress {
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
- public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
- LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+ public void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException {
+ LOG.info("Task " + taskId + " reporting fatal error: " + msg + " fast fail: " + fastFail);
}
public JvmTask getTask(JvmContext context) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
index bed545e..a534cfa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
@@ -124,7 +124,7 @@ public class TestTaskCommit extends HadoopTestCase {
}
@Override
- public void fatalError(TaskAttemptID taskId, String message)
+ public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
throws IOException { }
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org