You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:28:59 UTC

[20/37] hadoop git commit: MAPREDUCE-7022. Fast fail rogue jobs based on task scratch dir size. Contributed by Johan Gustavsson

MAPREDUCE-7022. Fast fail rogue jobs based on task scratch dir size. Contributed by Johan Gustavsson


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a37e7f0a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a37e7f0a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a37e7f0a

Branch: refs/heads/HDFS-7240
Commit: a37e7f0ad8b68c7ed16c242bedf62f4cde48d6fd
Parents: 1b0f265
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jan 26 14:36:45 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Jan 26 14:36:45 2018 -0600

----------------------------------------------------------------------
 .../hadoop/mapred/LocalContainerLauncher.java   |  2 +-
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  7 +-
 .../org/apache/hadoop/mapred/YarnChild.java     |  4 +-
 .../v2/app/job/event/TaskAttemptFailEvent.java  | 53 ++++++++++++
 .../app/job/event/TaskTAttemptFailedEvent.java  | 39 +++++++++
 .../v2/app/job/impl/TaskAttemptImpl.java        | 40 ++++++---
 .../mapreduce/v2/app/job/impl/TaskImpl.java     |  6 +-
 .../hadoop/mapreduce/v2/app/TestFail.java       |  7 +-
 .../hadoop/mapreduce/v2/app/TestRecovery.java   |  7 +-
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  |  5 +-
 .../v2/app/job/impl/TestTaskAttempt.java        |  9 +-
 .../mapreduce/v2/app/job/impl/TestTaskImpl.java | 42 ++++-----
 .../apache/hadoop/mapred/LocalJobRunner.java    |  4 +-
 .../java/org/apache/hadoop/mapred/MapTask.java  |  3 +-
 .../java/org/apache/hadoop/mapred/Task.java     | 87 ++++++++++++++++++-
 .../hadoop/mapred/TaskUmbilicalProtocol.java    | 12 ++-
 .../apache/hadoop/mapreduce/MRJobConfig.java    | 14 +++
 .../src/main/resources/mapred-default.xml       | 22 +++++
 .../hadoop/mapred/TestTaskProgressReporter.java | 90 +++++++++++++++++++-
 .../mapreduce/v2/hs/TestJobHistoryParsing.java  |  9 +-
 .../apache/hadoop/mapred/TestMapProgress.java   |  4 +-
 .../apache/hadoop/mapred/TestTaskCommit.java    |  2 +-
 22 files changed, 397 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 6f9cc34..fed500a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -510,7 +510,7 @@ public class LocalContainerLauncher extends AbstractService implements
           String cause =
               (tCause == null) ? throwable.getMessage() : StringUtils
                   .stringifyException(tCause);
-          umbilical.fatalError(classicAttemptID, cause);
+          umbilical.fatalError(classicAttemptID, cause, false);
         }
         throw new RuntimeException();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 556c90c..b155af22 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -281,7 +282,7 @@ public class TaskAttemptListenerImpl extends CompositeService
   }
 
   @Override
-  public void fatalError(TaskAttemptID taskAttemptID, String msg)
+  public void fatalError(TaskAttemptID taskAttemptID, String msg, boolean fastFail)
       throws IOException {
     // This happens only in Child and in the Task.
     LOG.error("Task: " + taskAttemptID + " - exited : " + msg);
@@ -294,7 +295,7 @@ public class TaskAttemptListenerImpl extends CompositeService
     preemptionPolicy.handleFailedContainer(attemptID);
 
     context.getEventHandler().handle(
-        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+        new TaskAttemptFailEvent(attemptID, fastFail));
   }
 
   @Override
@@ -312,7 +313,7 @@ public class TaskAttemptListenerImpl extends CompositeService
     preemptionPolicy.handleFailedContainer(attemptID);
 
     context.getEventHandler().handle(
-        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+        new TaskAttemptFailEvent(attemptID));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index 7ae7a1e..bd40e54 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -206,7 +206,7 @@ class YarnChild {
       if (taskid != null) {
         if (!ShutdownHookManager.get().isShutdownInProgress()) {
           umbilical.fatalError(taskid,
-              StringUtils.stringifyException(exception));
+              StringUtils.stringifyException(exception), false);
         }
       }
     } catch (Throwable throwable) {
@@ -218,7 +218,7 @@ class YarnChild {
           String cause =
               tCause == null ? throwable.getMessage() : StringUtils
                   .stringifyException(tCause);
-          umbilical.fatalError(taskid, cause);
+          umbilical.fatalError(taskid, cause, false);
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
new file mode 100644
index 0000000..6ea1d15
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptFailEvent extends TaskAttemptEvent {
+  private boolean fastFail;
+
+  /**
+   * Create a new TaskAttemptFailEvent, with task fastFail disabled.
+   *
+   * @param id the id of the task attempt
+   */
+  public TaskAttemptFailEvent(TaskAttemptId id) {
+    this(id, false);
+  }
+
+  /**
+   * Create a new TaskAttemptFailEvent.
+   *
+   * @param id the id of the task attempt
+   * @param fastFail should the task fastFail or not.
+   */
+  public TaskAttemptFailEvent(TaskAttemptId id, boolean fastFail) {
+    super(id, TaskAttemptEventType.TA_FAILMSG);
+    this.fastFail = fastFail;
+  }
+
+  /**
+   * Check if task should fast fail or retry
+   * @return boolean value where true indicates the task should not retry
+   */
+  public boolean isFastFail() {
+    return fastFail;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
new file mode 100644
index 0000000..30392ac
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskTAttemptFailedEvent extends TaskTAttemptEvent {
+
+  private boolean fastFail;
+
+  public TaskTAttemptFailedEvent(TaskAttemptId id) {
+    this(id, false);
+  }
+
+  public TaskTAttemptFailedEvent(TaskAttemptId id, boolean fastFail) {
+    super(id, TaskEventType.T_ATTEMPT_FAILED);
+    this.fastFail = fastFail;
+  }
+
+  public boolean isFastFail() {
+    return fastFail;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 431128b..6632f27 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
@@ -101,6 +102,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
@@ -194,6 +196,7 @@ public abstract class TaskAttemptImpl implements
   private Locality locality;
   private Avataar avataar;
   private boolean rescheduleNextAttempt = false;
+  private boolean failFast = false;
 
   private static final CleanupContainerTransition
       CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
@@ -1412,6 +1415,14 @@ public abstract class TaskAttemptImpl implements
   public void setAvataar(Avataar avataar) {
     this.avataar = avataar;
   }
+
+  public void setTaskFailFast(boolean failFast) {
+    this.failFast = failFast;
+  }
+
+  public boolean isTaskFailFast() {
+    return failFast;
+  }
   
   @SuppressWarnings("unchecked")
   public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
@@ -1921,9 +1932,12 @@ public abstract class TaskAttemptImpl implements
 
       switch(finalState) {
         case FAILED:
-          taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-              taskAttempt.attemptId,
-              TaskEventType.T_ATTEMPT_FAILED));
+          boolean fastFail = false;
+          if (event instanceof TaskAttemptFailEvent) {
+            fastFail = ((TaskAttemptFailEvent) event).isFastFail();
+          }
+          taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+              taskAttempt.attemptId, fastFail));
           break;
         case KILLED:
           taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
@@ -2041,13 +2055,16 @@ public abstract class TaskAttemptImpl implements
 
   private static class FailedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt,
         TaskAttemptEvent event) {
       // set the finish time
       taskAttempt.setFinishTime();
-      notifyTaskAttemptFailed(taskAttempt);
+
+      notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast());
     }
   }
 
@@ -2154,8 +2171,8 @@ public abstract class TaskAttemptImpl implements
         LOG.debug("Not generating HistoryFinish event since start event not " +
             "generated for taskAttempt: " + taskAttempt.getID());
       }
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+      taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+          taskAttempt.attemptId));
     }
   }
   
@@ -2332,6 +2349,8 @@ public abstract class TaskAttemptImpl implements
       if (event instanceof TaskAttemptKillEvent) {
         taskAttempt.setRescheduleNextAttempt(
             ((TaskAttemptKillEvent)event).getRescheduleAttempt());
+      } else if (event instanceof TaskAttemptFailEvent) {
+        taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail());
       }
     }
   }
@@ -2400,12 +2419,13 @@ public abstract class TaskAttemptImpl implements
       // register it to finishing state
       taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
           taskAttempt.attemptId);
-      notifyTaskAttemptFailed(taskAttempt);
+      notifyTaskAttemptFailed(taskAttempt, false);
     }
   }
 
   @SuppressWarnings("unchecked")
-  private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+  private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt,
+      boolean fastFail) {
     if (taskAttempt.getLaunchTime() == 0) {
       sendJHStartEventForAssignedFailTask(taskAttempt);
     }
@@ -2419,8 +2439,8 @@ public abstract class TaskAttemptImpl implements
     taskAttempt.eventHandler.handle(new JobHistoryEvent(
         taskAttempt.attemptId.getTaskId().getJobId(), tauce));
 
-    taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
-        taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+    taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
+        taskAttempt.attemptId, fastFail));
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index 086d4d5..ce3b3cc 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -1054,7 +1055,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      TaskTAttemptFailedEvent castEvent = (TaskTAttemptFailedEvent) event;
       TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
       task.failedAttempts.add(taskAttemptId); 
       if (taskAttemptId.equals(task.commitAttempt)) {
@@ -1068,7 +1069,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       }
       
       task.finishedAttempts.add(taskAttemptId);
-      if (task.failedAttempts.size() < task.maxAttempts) {
+      if (!castEvent.isFastFail()
+          && task.failedAttempts.size() < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
             taskAttemptId, 
             TaskAttemptCompletionEventStatus.FAILED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 4d3f6f4..a2f0aba 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -288,8 +289,7 @@ public class TestFail {
       if (attemptID.getTaskId().getId() == 0) {//check if it is first task
         // send the Fail event
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, 
-                TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else {
         getContext().getEventHandler().handle(
             new TaskAttemptEvent(attemptID,
@@ -310,8 +310,7 @@ public class TestFail {
         //check if it is first task's first attempt
         // send the Fail event
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, 
-                TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else {
         getContext().getEventHandler().handle(
             new TaskAttemptEvent(attemptID,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 893c4a0..b2807c1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 
 import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -167,9 +169,8 @@ public class TestRecovery {
     /////////// Play some games with the TaskAttempts of the first task //////
     //send the fail signal to the 1st map task attempt
     app.getContext().getEventHandler().handle(
-        new TaskAttemptEvent(
-            task1Attempt1.getID(),
-            TaskAttemptEventType.TA_FAILMSG));
+        new TaskAttemptFailEvent(
+            task1Attempt1.getID()));
     
     app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1827ce4..8592b20 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -437,8 +437,7 @@ public class TestJobImpl {
       TaskImpl task = (TaskImpl) t;
       task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
       for(TaskAttempt ta: task.getAttempts().values()) {
-        task.handle(new TaskTAttemptEvent(ta.getID(),
-          TaskEventType.T_ATTEMPT_FAILED));
+        task.handle(new TaskTAttemptFailedEvent(ta.getID()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index fe5d95d..43571a9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -499,7 +500,7 @@ public class TestTaskAttempt{
           new TaskAttemptDiagnosticsUpdateEvent(attemptID,
               "Test Diagnostic Event"));
       getContext().getEventHandler().handle(
-          new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+          new TaskAttemptFailEvent(attemptID));
     }
 
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
@@ -1357,8 +1358,7 @@ public class TestTaskAttempt{
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
 
-    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
-        TaskAttemptEventType.TA_FAILMSG));
+    taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
 
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
@@ -1484,8 +1484,7 @@ public class TestTaskAttempt{
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
 
-    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
-        TaskAttemptEventType.TA_FAILMSG));
+    taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
 
     assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
         TaskAttemptState.FAILED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
index 62d4cc0..1225c43 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
@@ -345,8 +346,7 @@ public class TestTaskImpl {
   }
 
   private void failRunningTaskAttempt(TaskAttemptId attemptId) {
-    mockTask.handle(new TaskTAttemptEvent(attemptId, 
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(attemptId));
     assertTaskRunningState();
   }
   
@@ -612,11 +612,16 @@ public class TestTaskImpl {
     
     // The task should now have succeeded
     assertTaskSucceededState();
-    
+
     // Now complete the first task attempt, after the second has succeeded
-    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
-        firstAttemptFinishEvent));
-    
+    if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) {
+      mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts
+          .get(0).getAttemptId()));
+    } else {
+      mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
+          firstAttemptFinishEvent));
+    }
+
     // The task should still be in the succeeded state
     assertTaskSucceededState();
     
@@ -668,8 +673,8 @@ public class TestTaskImpl {
     assertEquals(2, taskAttempts.size());
 
     // speculative attempt retroactively fails from fetch failures
-    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(
+        taskAttempts.get(1).getAttemptId()));
 
     assertTaskScheduledState();
     assertEquals(3, taskAttempts.size());
@@ -683,8 +688,8 @@ public class TestTaskImpl {
     assertEquals(2, taskAttempts.size());
 
     // speculative attempt retroactively fails from fetch failures
-    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(
+        taskAttempts.get(1).getAttemptId()));
 
     assertTaskScheduledState();
     assertEquals(3, taskAttempts.size());
@@ -698,8 +703,8 @@ public class TestTaskImpl {
     assertEquals(2, taskAttempts.size());
 
     // speculative attempt retroactively fails from fetch failures
-    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(
+        taskAttempts.get(1).getAttemptId()));
 
     assertTaskScheduledState();
     assertEquals(3, taskAttempts.size());
@@ -734,8 +739,8 @@ public class TestTaskImpl {
     // have the first attempt fail, verify task failed due to no retries
     MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
     taskAttempt.setState(TaskAttemptState.FAILED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(
+        taskAttempt.getAttemptId()));
     assertEquals(TaskState.FAILED, mockTask.getState());
 
     // verify task can no longer be killed
@@ -757,8 +762,7 @@ public class TestTaskImpl {
         TaskEventType.T_ATTEMPT_COMMIT_PENDING));
     assertEquals(TaskState.FAILED, mockTask.getState());
     taskAttempt.setState(TaskAttemptState.FAILED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
     assertEquals(TaskState.FAILED, mockTask.getState());
     taskAttempt = taskAttempts.get(2);
     taskAttempt.setState(TaskAttemptState.SUCCEEDED);
@@ -808,8 +812,7 @@ public class TestTaskImpl {
     // max attempts is 4
     MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
     taskAttempt.setState(TaskAttemptState.FAILED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
     assertEquals(TaskState.RUNNING, mockTask.getState());
 
     // verify a new attempt(#3) added because the speculative attempt(#2)
@@ -829,8 +832,7 @@ public class TestTaskImpl {
     // hasn't reach the max attempts which is 4
     MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1);
     taskAttempt1.setState(TaskAttemptState.FAILED);
-    mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId()));
     assertEquals(TaskState.RUNNING, mockTask.getState());
 
     // verify there's no new attempt added because of the running attempt(#3)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index c9dff6a..5e7a250 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -729,9 +729,9 @@ public class LocalJobRunner implements ClientProtocol {
       LOG.error("shuffleError: "+ message + "from task: " + taskId);
     }
     
-    public synchronized void fatalError(TaskAttemptID taskId, String msg) 
+    public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail)
     throws IOException {
-      LOG.error("Fatal: "+ msg + "from task: " + taskId);
+      LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 27c8976..ab7cba5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -1568,7 +1568,8 @@ public class MapTask extends Task {
         if (lspillException instanceof Error) {
           final String logMsg = "Task " + getTaskID() + " failed : " +
             StringUtils.stringifyException(lspillException);
-          mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
+          mapTask.reportFatalError(getTaskID(), lspillException, logMsg,
+              false);
         }
         throw new IOException("Spill failed", lspillException);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 730f4ee..87c9e16 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -354,7 +355,7 @@ abstract public class Task implements Writable, Configurable {
    * Report a fatal error to the parent (task) tracker.
    */
   protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
-                                  String logMsg) {
+                                  String logMsg, boolean fastFail) {
     LOG.error(logMsg);
     
     if (ShutdownHookManager.get().isShutdownInProgress()) {
@@ -366,7 +367,7 @@ abstract public class Task implements Writable, Configurable {
                    ? StringUtils.stringifyException(throwable)
                    : StringUtils.stringifyException(tCause);
     try {
-      umbilical.fatalError(id, cause);
+      umbilical.fatalError(id, cause, fastFail);
     } catch (IOException ioe) {
       LOG.error("Failed to contact the tasktracker", ioe);
       System.exit(-1);
@@ -652,6 +653,8 @@ abstract public class Task implements Writable, Configurable {
     private Thread pingThread = null;
     private boolean done = true;
     private Object lock = new Object();
+    private volatile String diskLimitCheckStatus = null;
+    private Thread diskLimitCheckThread = null;
 
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -749,6 +752,65 @@ abstract public class Task implements Writable, Configurable {
     }
 
     /**
+     * disk limit checker, runs in separate thread when activated.
+     */
+    public class DiskLimitCheck implements Runnable {
+      private LocalFileSystem localFS;
+      private long fsLimit;
+      private long checkInterval;
+      private String[] localDirs;
+      private boolean killOnLimitExceeded;
+
+      public DiskLimitCheck(JobConf conf) throws IOException {
+        this.localFS = FileSystem.getLocal(conf);
+        this.fsLimit = conf.getLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
+            MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES);
+        this.localDirs = conf.getLocalDirs();
+        this.checkInterval = conf.getLong(
+            MRJobConfig.JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS,
+            MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS);
+        this.killOnLimitExceeded = conf.getBoolean(
+            MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+            MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED);
+      }
+
+      @Override
+      public void run() {
+        while (!taskDone.get()) {
+          try {
+            long localWritesSize = 0L;
+            String largestWorkDir = null;
+            for (String local : localDirs) {
+              long size = FileUtil.getDU(localFS.pathToFile(new Path(local)));
+              if (localWritesSize < size) {
+                localWritesSize = size;
+                largestWorkDir = local;
+              }
+            }
+            if (localWritesSize > fsLimit) {
+              String localStatus =
+                  "too much data in local scratch dir="
+                      + largestWorkDir
+                      + ". current size is "
+                      + localWritesSize
+                      + " the limit is " + fsLimit;
+              if (killOnLimitExceeded) {
+                LOG.error(localStatus);
+                diskLimitCheckStatus = localStatus;
+              } else {
+                LOG.warn(localStatus);
+              }
+              break;
+            }
+            Thread.sleep(checkInterval);
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      }
+    }
+
+    /**
      * check the counters to see whether the task has exceeded any configured
      * limits.
      * @throws TaskLimitException
@@ -773,6 +835,9 @@ abstract public class Task implements Writable, Configurable {
                   " the limit is " + limit);
         }
       }
+      if (diskLimitCheckStatus != null) {
+        throw new TaskLimitException(diskLimitCheckStatus);
+      }
     }
 
     /**
@@ -851,7 +916,7 @@ abstract public class Task implements Writable, Configurable {
                   StringUtils.stringifyException(e);
           LOG.error(errMsg);
           try {
-            umbilical.fatalError(taskId, errMsg);
+            umbilical.fatalError(taskId, errMsg, true);
           } catch (IOException ioe) {
             LOG.error("Failed to update failure diagnosis", ioe);
           }
@@ -884,6 +949,22 @@ abstract public class Task implements Writable, Configurable {
         pingThread.setDaemon(true);
         pingThread.start();
       }
+      startDiskLimitCheckerThreadIfNeeded();
+    }
+    public void startDiskLimitCheckerThreadIfNeeded() {
+      if (diskLimitCheckThread == null && conf.getLong(
+          MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
+          MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES) >= 0) {
+        try {
+          diskLimitCheckThread = new Thread(new DiskLimitCheck(conf),
+              "disk limit check thread");
+          diskLimitCheckThread.setDaemon(true);
+          diskLimitCheckThread.start();
+        } catch (IOException e) {
+          LOG.error("Issues starting disk monitor thread: "
+              + e.getMessage(), e);
+        }
+      }
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
index c3678d6..041ab39 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
@@ -68,9 +68,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
    * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 19 Added fatalError for child to communicate fatal errors to TT
    * Version 20 Added methods to manage checkpoints
+   * Version 21 Added fastFail parameter to fatalError
    * */
 
-  public static final long versionID = 20L;
+  public static final long versionID = 21L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -140,8 +141,13 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
   /** Report that the task encounted a local filesystem error.*/
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
-  /** Report that the task encounted a fatal error.*/
-  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  /**
+   * Report that the task encounted a fatal error.
+   * @param taskId task's id
+   * @param message fail message
+   * @param fastFail flag to enable fast fail for task
+   */
+  void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException;
   
   /** Called by a reduce task to get the map output locations for finished maps.
    * Returns an update centered around the map-task-completion-events. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 6acf1bc..ca18bfe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -52,6 +52,20 @@ public interface MRJobConfig {
 
   public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
 
+  public static final String JOB_SINGLE_DISK_LIMIT_BYTES =
+          "mapreduce.job.local-fs.single-disk-limit.bytes";
+  // negative values disable the limit
+  public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
+
+  public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
+      "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
+  // setting to false only logs the kill
+  public static final boolean DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = true;
+
+  public static final String JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS =
+      "mapreduce.job.local-fs.single-disk-limit.check.interval-ms";
+  public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = 5000;
+
   public static final String TASK_LOCAL_WRITE_LIMIT_BYTES =
           "mapreduce.task.local-fs.write-limit.bytes";
   // negative values disable the limit

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 62f3dfa..72f509c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -63,6 +63,28 @@
 </property>
 
 <property>
+  <name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
+  <value>-1</value>
+  <description>Enable an in task monitor thread to watch for single disk
+    consumption by jobs. By setting this to x nr of bytes, the task will fast
+    fail in case it is reached. This is a per disk configuration.</description>
+</property>
+
+<property>
+  <name>mapreduce.job.local-fs.single-disk-limit.check.interval-ms</name>
+  <value>5000</value>
+  <description>Interval of disk limit check to run in ms.</description>
+</property>
+
+<property>
+  <name>mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed</name>
+  <value>true</value>
+  <description>If mapreduce.job.local-fs.single-disk-limit.bytes is triggered
+    should the task be killed or logged. If false the intent to kill the task
+    is only logged in the container logs.</description>
+</property>
+
+<property>
   <name>mapreduce.job.maps</name>
   <value>2</value>
   <description>The default number of map tasks per job.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
index 18442d6..e5ff64e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
@@ -18,15 +18,19 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.util.ExitUtil;
@@ -43,6 +47,11 @@ public class TestTaskProgressReporter {
 
   private FakeUmbilical fakeUmbilical = new FakeUmbilical();
 
+  private static final String TEST_DIR =
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")) + "/" +
+      TestTaskProgressReporter.class.getName();
+
   private static class DummyTask extends Task {
     @Override
     public void run(JobConf job, TaskUmbilicalProtocol umbilical)
@@ -53,6 +62,11 @@ public class TestTaskProgressReporter {
     public boolean isMapTask() {
       return true;
     }
+
+    @Override
+    public boolean isCommitRequired() {
+      return false;
+    }
   }
 
   private static class FakeUmbilical implements TaskUmbilicalProtocol {
@@ -118,7 +132,7 @@ public class TestTaskProgressReporter {
     }
 
     @Override
-    public void fatalError(TaskAttemptID taskId, String message)
+    public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
         throws IOException {
     }
 
@@ -163,6 +177,78 @@ public class TestTaskProgressReporter {
     }
   }
 
+  @Test(timeout=60000)
+  public void testScratchDirSize() throws Exception {
+    String tmpPath = TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
+        + new Random(System.currentTimeMillis()).nextInt();
+    File data = new File(tmpPath + "/out");
+    File testDir = new File(tmpPath);
+    testDir.mkdirs();
+    testDir.deleteOnExit();
+    JobConf conf = new JobConf();
+    conf.setStrings(MRConfig.LOCAL_DIR, "file://" + tmpPath);
+    conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, 1024L);
+    conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+        true);
+    getBaseConfAndWriteToFile(-1, data);
+    testScratchDirLimit(false, conf);
+    data.delete();
+    getBaseConfAndWriteToFile(100, data);
+    testScratchDirLimit(false, conf);
+    data.delete();
+    getBaseConfAndWriteToFile(1536, data);
+    testScratchDirLimit(true, conf);
+    conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+        false);
+    testScratchDirLimit(false, conf);
+    conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
+        true);
+    conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, -1L);
+    testScratchDirLimit(false, conf);
+    data.delete();
+    FileUtil.fullyDelete(testDir);
+  }
+
+  private void getBaseConfAndWriteToFile(int size, File data)
+      throws IOException {
+    if (size > 0) {
+      byte[] b = new byte[size];
+      for (int i = 0; i < size; i++) {
+        b[i] = 1;
+      }
+      FileUtils.writeByteArrayToFile(data, b);
+    }
+  }
+
+  public void testScratchDirLimit(boolean fastFail, JobConf conf)
+          throws Exception {
+    ExitUtil.disableSystemExit();
+    threadExited = false;
+    Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+      public void uncaughtException(Thread th, Throwable ex) {
+        if (ex instanceof ExitUtil.ExitException) {
+          threadExited = true;
+          th.interrupt();
+        }
+      }
+    };
+    Task task = new DummyTask();
+    task.setConf(conf);
+    DummyTaskReporter reporter = new DummyTaskReporter(task);
+    reporter.startDiskLimitCheckerThreadIfNeeded();
+    Thread t = new Thread(reporter);
+    t.setUncaughtExceptionHandler(h);
+    reporter.setProgressFlag();
+    t.start();
+    while (!reporter.taskLimitIsChecked) {
+      Thread.yield();
+    }
+    task.done(fakeUmbilical, reporter);
+    reporter.resetDoneFlag();
+    t.join(1000L);
+    Assert.assertEquals(fastFail, threadExited);
+  }
+
   @Test (timeout=10000)
   public void testTaskProgress() throws Exception {
     JobConf job = new JobConf();
@@ -214,7 +300,7 @@ public class TestTaskProgressReporter {
     conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
     conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit);
     LocalFileSystem localFS = FileSystem.getLocal(conf);
-    Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-"
+    Path tmpPath = new Path(TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
             + new Random(System.currentTimeMillis()).nextInt());
     FSDataOutputStream out = localFS.create(tmpPath, true);
     out.write(new byte[LOCAL_BYTES_WRITTEN]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
index 83e35fe..7b70f98 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -712,7 +713,7 @@ public class TestJobHistoryParsing {
     protected void attemptLaunched(TaskAttemptId attemptID) {
       if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else {
         getContext().getEventHandler().handle(
             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -732,7 +733,7 @@ public class TestJobHistoryParsing {
     protected void attemptLaunched(TaskAttemptId attemptID) {
       if (attemptID.getTaskId().getId() == 0) {
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else {
         getContext().getEventHandler().handle(
             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -760,10 +761,10 @@ public class TestJobHistoryParsing {
             new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
       } else if (taskType == TaskType.MAP && taskId == 1) {
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else if (taskType == TaskType.REDUCE && taskId == 0) {
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+            new TaskAttemptFailEvent(attemptID));
       } else if (taskType == TaskType.REDUCE && taskId == 1) {
         getContext().getEventHandler().handle(
             new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
index f364c18..9b6ebda 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
@@ -91,8 +91,8 @@ public class TestMapProgress {
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
-      LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+    public void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException {
+      LOG.info("Task " + taskId + " reporting fatal error: " + msg + " fast fail: " + fastFail);
     }
 
     public JvmTask getTask(JvmContext context) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
index bed545e..a534cfa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
@@ -124,7 +124,7 @@ public class TestTaskCommit extends HadoopTestCase {
     }
 
     @Override
-    public void fatalError(TaskAttemptID taskId, String message)
+    public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
         throws IOException { }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org