You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/30 06:11:11 UTC

tez git commit: TEZ-808. Handle task attempts that are not making progress (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 9ca694789 -> 414258e40


TEZ-808. Handle task attempts that are not making progress (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/414258e4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/414258e4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/414258e4

Branch: refs/heads/master
Commit: 414258e40ff72961c6d1e6fb287b3d2c9732a7a8
Parents: 9ca6947
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Oct 29 22:10:56 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Oct 29 22:10:56 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 14 +++
 .../tez/runtime/api/ProcessorContext.java       |  4 +-
 .../org/apache/tez/runtime/api/TaskContext.java | 10 ++
 .../records/TaskAttemptTerminationCause.java    |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 25 +++++
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  3 +-
 .../dag/app/TestTaskCommunicatorManager1.java   |  4 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 98 +++++++++++++++++++-
 .../org/apache/tez/runtime/RuntimeTask.java     | 11 +++
 .../api/events/TaskStatusUpdateEvent.java       | 11 ++-
 .../api/impl/TezProcessorContextImpl.java       |  1 +
 .../runtime/api/impl/TezTaskContextImpl.java    |  5 +
 .../apache/tez/runtime/task/TaskReporter.java   |  4 +-
 .../TestLogicalIOProcessorRuntimeTask.java      |  6 ++
 .../runtime/api/impl/TestProcessorContext.java  |  8 +-
 .../tez/runtime/task/TestTaskReporter.java      | 10 ++
 .../org/apache/tez/test/TestFaultTolerance.java | 30 +++++-
 18 files changed, 232 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 96300fa..03ccdeb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
 
 ALL CHANGES:
+  TEZ-808. Handle task attempts that are not making progress
   TEZ-2553. Tez UI: Tez UI Nits
   TEZ-2814. ATSImportTool has a return statement in a finally block
   TEZ-2906. Compilation fails with hadoop 2.2.0

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ac3dd4a..0ea8999 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -690,6 +690,20 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat";
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
+  
+  /**
+   * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output 
+   * components need to make successive progress notifications. If the progress is not notified 
+   * for this interval then the task will be considered hung and terminated.
+   * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS} 
+   * and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}.
+   * A config value <=0 disables this.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty
+  public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX + 
+    "progress.stuck.interval-ms";
+  public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;
 
   /**
    * Whether to generate counters per IO or not. Enabling this will rename

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
index 2ac6e4c..8b88289 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 public interface ProcessorContext extends TaskContext {
 
   /**
-   * Set the overall progress of this Task Attempt
+   * Set the overall progress of this Task Attempt.
+   * This automatically results in invocation of {@link ProcessorContext#notifyProgress()} 
+   * and so invoking that separately is not required.
    * @param progress Progress in the range from [0.0 - 1.0f]
    */
   public void setProgress(float progress);

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 92d5575..457b0de 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -120,6 +120,16 @@ public interface TaskContext {
    * @return {@link ObjectRegistry}
    */
   public ObjectRegistry getObjectRegistry();
+  
+  /**
+   * Notifies the framework that progress is being made by this component. 
+   * This is used to identify hung components that are not making progress.
+   * Must be called periodically until processing has completed for this component.
+   * Care must be taken to call this when real progress has been made. Simply 
+   * calling this continuously from a thread without regard to real work may prevent 
+   * identification of hung components and delay/stall job completion.
+   */
+  public void notifyProgress();
 
   /**
    * Report a fatal error to the framework. This will cause the entire task to

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index 7112d9e..a5214fb 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -34,6 +34,7 @@ public enum TaskAttemptTerminationCause {
   INPUT_READ_ERROR, // Failed due to error in reading inputs
   OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
   OUTPUT_LOST, // Failed because attempts output were reported lost
+  NO_PROGRESS, // Failed because no progress was being made
   TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
   
   CONTAINER_LAUNCH_FAILED, // Failed to launch container

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 35a23f9..27eb69b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -191,6 +191,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private DAGCounter localityCounter;
   
   org.apache.tez.runtime.api.impl.TaskStatistics statistics;
+  
+  long lastNotifyProgressTimestamp = 0;
 
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
@@ -1372,6 +1374,29 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.reportedStatus.progress = statusEvent.getProgress();
       ta.reportedStatus.counters = statusEvent.getCounters();
       ta.statistics = statusEvent.getStatistics();
+      if (statusEvent.getProgressNotified()) {
+        ta.lastNotifyProgressTimestamp = ta.clock.getTime();
+      } else {
+        long currTime = ta.clock.getTime();
+        long hungIntervalMax = ta.conf.getLong(
+            TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 
+            TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
+        if (hungIntervalMax > 0 &&
+            currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) {
+          // task is hung
+          String diagnostics = "Attempt failed because it appears to make no progress for " + 
+          hungIntervalMax + "ms";
+          LOG.info(diagnostics + " " + ta.getID());
+          // send event that will fail this attempt
+          ta.sendEvent(
+              new TaskAttemptEventAttemptFailed(ta.getID(),
+                  TaskAttemptEventType.TA_FAILED,
+                  diagnostics, 
+                  TaskAttemptTerminationCause.NO_PROGRESS)
+              );
+        }
+      }
+      
       if (sEvent.getReadErrorReported()) {
         // if there is a read error then track the next last data event
         ta.appendNextDataEvent = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3cab2da..f2130ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -414,7 +414,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
               cData.numUpdates++;
               float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
               float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
-              events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
+              events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats, false), 
+                  new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
                   MockDAGAppMaster.this.getContext().getClock().getTime()));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 03b7da9..17fa4d9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -235,7 +235,7 @@ public class TestTaskCommunicatorManager1 {
   @Test (timeout = 5000)
   public void testTaskEventRouting() throws Exception {
     List<TezEvent> events =  Arrays.asList(
-      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
       new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
       new TezEvent(new TaskAttemptCompletedEvent(), null)
     );
@@ -264,7 +264,7 @@ public class TestTaskCommunicatorManager1 {
   @Test (timeout = 5000)
   public void testTaskEventRoutingWithReadError() throws Exception {
     List<TezEvent> events =  Arrays.asList(
-      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
       new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
       new TezEvent(new TaskAttemptCompletedEvent(), null)
     );

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3cf3309..17295cd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -646,7 +647,7 @@ public class TestTaskAttempt {
     when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
     when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
     TaskAttemptEventStatusUpdate statusEvent =
-        new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+        new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false));
 
     assertEquals(0, taImpl.lastDataEvents.size());
     taImpl.setLastEventSent(mockTezEvent1);
@@ -729,7 +730,7 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(0,
             expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
-    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
     
     taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
         TaskAttemptTerminationCause.APPLICATION_ERROR));
@@ -769,7 +770,95 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
   }
-  
+
+  @Test (timeout = 5000)
+  public void testNoProgressFail() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    Clock mockClock = mock(Clock.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, mockClock,
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+    
+    when(mockClock.getTime()).thenReturn(100l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true)));
+    // invocations and time updated
+    assertEquals(100l, taImpl.lastNotifyProgressTimestamp);
+    when(mockClock.getTime()).thenReturn(150l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true)));
+    // invocations and time updated
+    assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+    when(mockClock.getTime()).thenReturn(200l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    // invocations and time not updated
+    assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+    when(mockClock.getTime()).thenReturn(250l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    // invocations and time not updated
+    assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+    // failed event sent to self
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue();
+    assertEquals(taImpl.getID(), fEvent.getTaskAttemptID());
+    assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause());
+    taImpl.handle(fEvent);
+
+    assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID);
+    assertEquals(1, taImpl.getDiagnostics().size());
+    assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taImpl.getTerminationCause());
+  }
+
   @Test(timeout = 5000)
   public void testEventSerializingHash() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -858,7 +947,8 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(0,
             expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
-    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, 
+        new TaskStatusUpdateEvent(null, 0.1f, null, false)));
     
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index c9c6ba1..23e57b1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -54,6 +54,7 @@ public abstract class RuntimeTask {
   private final AtomicBoolean taskDone;
   private final TaskCounterUpdater counterUpdater;
   private final TaskStatistics statistics;
+  private volatile boolean progressNotified;
 
   protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
       TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
@@ -104,6 +105,16 @@ public abstract class RuntimeTask {
     this.fatalErrorMessage = message;
   }
   
+  public void notifyProgressInvocation() {
+    progressNotified = true;
+  }
+  
+  public boolean getAndClearProgressNotification() {
+    boolean retVal = progressNotified;
+    progressNotified = false;
+    return retVal;
+  }
+  
   public Throwable getFatalError() {
     return this.fatalError.get();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
index 6465bed..518cbf4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -31,15 +31,18 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
 
   private TezCounters tezCounters;
   private float progress;
+  boolean progressNotified;
   private TaskStatistics statistics;
 
   public TaskStatusUpdateEvent() {
   }
 
-  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics) {
+  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics, 
+      boolean progressNotified) {
     this.tezCounters = tezCounters;
     this.progress = progress;
     this.statistics = statistics;
+    this.progressNotified = progressNotified;
   }
 
   public TezCounters getCounters() {
@@ -53,10 +56,15 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
   public TaskStatistics getStatistics() {
     return statistics;
   }
+  
+  public boolean getProgressNotified() {
+    return progressNotified;
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeFloat(progress);
+    out.writeBoolean(progressNotified);
     if (tezCounters != null) {
       out.writeBoolean(true);
       tezCounters.write(out);
@@ -74,6 +82,7 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
   @Override
   public void readFields(DataInput in) throws IOException {
     progress = in.readFloat();
+    progressNotified = in.readBoolean();
     if (in.readBoolean()) {
       tezCounters = new TezCounters();
       tezCounters.readFields(in);

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 6dc30ff..0c3283d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -94,6 +94,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
   @Override
   public void setProgress(float progress) {
     runtimeTask.setProgress(progress);
+    notifyProgress();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 5f04c80..211f9d7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -174,6 +174,11 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
   }
 
   @Override
+  public void notifyProgress() {
+    runtimeTask.notifyProgressInvocation();
+  }
+  
+  @Override
   public ByteBuffer getServiceConsumerMetaData(String serviceName) {
     return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
         .asReadOnlyBuffer().rewind();

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 263300e..30a1b9c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -347,15 +347,17 @@ public class TaskReporter implements TaskReporterInterface {
       TezCounters counters = null;
       TaskStatistics stats = null;
       float progress = 0;
+      boolean progressNotified = false;
       if (task.hasInitialized()) {
         progress = task.getProgress();
+        progressNotified = task.getAndClearProgressNotification();
         if (sendCounters) {
           // send these potentially large objects at longer intervals to avoid overloading the AM
           counters = task.getCounters();
           stats = task.getTaskStatistics();
         }
       }
-      return new TaskStatusUpdateEvent(counters, progress, stats);
+      return new TaskStatusUpdateEvent(counters, progress, stats, progressNotified);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 0fc3919..12fec7e 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -93,6 +93,9 @@ public class TestLogicalIOProcessorRuntimeTask {
       assertEquals(1, TestProcessor.runCount);
       assertEquals(1, TestInput.startCount);
       assertEquals(0, TestOutput.startCount);
+      // test that invocations of progress are counted correctly
+      assertEquals(true, lio1.getAndClearProgressNotification());
+      assertEquals(false, lio1.getAndClearProgressNotification()); // cleared after getting
       assertEquals(30, TestInput.vertexParallelism);
       assertEquals(0, TestOutput.vertexParallelism);
       assertEquals(30, lio1.getProcessorContext().getVertexParallelism());
@@ -237,6 +240,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
         throws Exception {
       runCount++;
+      getContext().notifyProgress();
     }
 
 	@Override
@@ -271,6 +275,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     public void start() throws Exception {
       startCount++;
       this.vertexParallelism = getContext().getVertexParallelism();
+      getContext().notifyProgress();
     }
 
     @Override
@@ -310,6 +315,7 @@ public class TestLogicalIOProcessorRuntimeTask {
       System.err.println("Out started");
       startCount++;
       this.vertexParallelism = getContext().getVertexParallelism();
+      getContext().notifyProgress();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index e28df3a..ff94e7f 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -17,8 +17,7 @@ package org.apache.tez.runtime.api.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -97,6 +96,9 @@ public class TestProcessorContext {
     assertEquals(vertexName, procContext.getTaskVertexName());
     assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
     assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
-
+    
+    // test auto call of notifyProgress
+    procContext.setProgress(0.1f);
+    verify(runtimeTask, times(1)).notifyProgressInvocation();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
index b44c9f8..e137d50 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -116,6 +116,8 @@ public class TestTaskReporter {
     LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
     doReturn("vertexName").when(mockTask).getVertexName();
     doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+    boolean progressNotified = false;
+    doReturn(progressNotified).when(mockTask).getAndClearProgressNotification();
     TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
     
     float progress = 0.5f;
@@ -136,9 +138,11 @@ public class TestTaskReporter {
     TaskStatusUpdateEvent event = heartbeatCallable.getStatusUpdateEvent(true);
     verify(mockTask, times(1)).hasInitialized();
     verify(mockTask, times(0)).getProgress();
+    verify(mockTask, times(0)).getAndClearProgressNotification();
     verify(mockTask, times(0)).getTaskStatistics();
     verify(mockTask, times(0)).getCounters();
     Assert.assertEquals(0, event.getProgress(), 0);
+    Assert.assertEquals(false, event.getProgressNotified());
     Assert.assertNull(event.getCounters());
     Assert.assertNull(event.getStatistics());
 
@@ -147,20 +151,26 @@ public class TestTaskReporter {
     event = heartbeatCallable.getStatusUpdateEvent(false);
     verify(mockTask, times(2)).hasInitialized();
     verify(mockTask, times(1)).getProgress();
+    verify(mockTask, times(1)).getAndClearProgressNotification();
     verify(mockTask, times(0)).getTaskStatistics();
     verify(mockTask, times(0)).getCounters();
     Assert.assertEquals(progress, event.getProgress(), 0);
+    Assert.assertEquals(progressNotified, event.getProgressNotified());
     Assert.assertNull(event.getCounters());
     Assert.assertNull(event.getStatistics());
 
     // task is initialized - progress obtained and also counters since flag is true
+    progressNotified = true;
+    doReturn(progressNotified).when(mockTask).getAndClearProgressNotification();
     doReturn(true).when(mockTask).hasInitialized();
     event = heartbeatCallable.getStatusUpdateEvent(true);
     verify(mockTask, times(3)).hasInitialized();
     verify(mockTask, times(2)).getProgress();
+    verify(mockTask, times(2)).getAndClearProgressNotification();
     verify(mockTask, times(1)).getTaskStatistics();
     verify(mockTask, times(1)).getCounters();
     Assert.assertEquals(progress, event.getProgress(), 0);
+    Assert.assertEquals(progressNotified, event.getProgressNotified());
     Assert.assertEquals(counters, event.getCounters());
     Assert.assertEquals(stats, event.getStatistics());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index ec89c4b..011e91d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -23,6 +23,9 @@ import java.util.Random;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -90,6 +93,7 @@ public class TestFaultTolerance {
       tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
       tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4);
       tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3);
+      tezConf.setInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, 100);
 
       tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
       tezSession.start();
@@ -117,6 +121,11 @@ public class TestFaultTolerance {
   }
 
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts) throws Exception {
+    runDAGAndVerify(dag, finalState, checkFailedAttempts, null);
+  }
+  
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts, 
+      String diagnostics) throws Exception {
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -129,12 +138,17 @@ public class TestFaultTolerance {
       dagStatus = dagClient.getDAGStatus(null);
     }
 
+    Assert.assertEquals(finalState, dagStatus.getState());
+    
     if (checkFailedAttempts > 0) {
       Assert.assertEquals(checkFailedAttempts,
           dagStatus.getDAGProgress().getFailedTaskAttemptCount());
     }
 
-    Assert.assertEquals(finalState, dagStatus.getState());
+    if (diagnostics != null) {
+      Assert.assertNotNull(dagStatus.getDiagnostics());
+      Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics));
+    }
   }
   
   @Test (timeout=60000)
@@ -749,4 +763,18 @@ public class TestFaultTolerance {
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
+  @Test (timeout=240000)
+  public void testNoProgress() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    DAG dag = SimpleTestDAG.createDAG(testConf);
+    Vertex hung = dag.getVertex("v1");
+    hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000));
+    hung.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, Integer.toString(2));
+    
+    // dag will fail with 2 attempts failing from vertex v1
+    runDAGAndVerify(dag, DAGStatus.State.FAILED, 2, "no progress");
+  }
+  
 }