You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:12 UTC

[06/50] [abbrv] git commit: TEZ-853. Support counters recovery. (Jeff Zhang via hitesh)

TEZ-853. Support counters recovery. (Jeff Zhang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/16a0f579
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/16a0f579
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/16a0f579

Branch: refs/heads/branch-0.5
Commit: 16a0f57952d554e95ae1346ac3f360998ac7faac
Parents: fcc7426
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 22:03:26 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 22:03:26 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/common/counters/DAGCounter.java  |  1 +
 .../apache/tez/dag/api/client/DAGStatus.java    |  2 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 53 +++++------
 .../dag/history/events/TaskFinishedEvent.java   | 10 +--
 .../dag/history/events/VertexFinishedEvent.java |  8 --
 tez-dag/src/main/proto/HistoryEvents.proto      |  4 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 16 ++++
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 60 +++++++++++--
 .../TestHistoryEventsProtoConversion.java       |  6 --
 .../org/apache/tez/test/TestAMRecovery.java     | 94 +++++++++++++++++---
 11 files changed, 178 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fc7e83..519aaa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ ALL CHANGES:
   TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
   TEZ-1559. Add system tests for AM recovery.
   TEZ-850. Recovery unit tests.
+  TEZ-853. Support counters recovery.
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 3598572..b5bdffb 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public enum DAGCounter {
   NUM_FAILED_TASKS, 
   NUM_KILLED_TASKS,
+  NUM_SUCCEEDED_TASKS,
   TOTAL_LAUNCHED_TASKS,
   OTHER_LOCAL_TASKS,
   DATA_LOCAL_TASKS,

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 297c6f2..7c51c42 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -170,7 +170,7 @@ public class DAGStatus {
       + ", diagnostics="
       + StringUtils.join(getDiagnostics(), LINE_SEPARATOR)
       + ", counters="
-      + (dagCounters == null ? "null" : dagCounters.toString()));
+      + (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7ba90b5..b8430cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -791,6 +791,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         this.launchTime = tEvent.getStartTime();
         recoveryStartEventSeen = true;
         recoveredState = TaskAttemptState.RUNNING;
+        sendEvent(createDAGCounterUpdateEventTALaunched(this));
         return recoveredState;
       }
       case TASK_ATTEMPT_FINISHED:
@@ -806,6 +807,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         this.reportedStatus.state = tEvent.getState();
         this.diagnostics.add(tEvent.getDiagnostics());
         this.recoveredState = tEvent.getState();
+        sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
         return recoveredState;
       }
       default:
@@ -829,47 +831,29 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   // TOOD Merge some of these JobCounter events.
-  private static DAGEventCounterUpdate createJobCounterUpdateEventTALaunched(
+  private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
       TaskAttemptImpl ta) {
-    DAGEventCounterUpdate jce =
+    DAGEventCounterUpdate dagCounterEvent =
         new DAGEventCounterUpdate(
             ta.getDAGID()
             );
-    jce.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
-    return jce;
+    dagCounterEvent.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
+    return dagCounterEvent;
   }
 
-  private static DAGEventCounterUpdate createJobCounterUpdateEventSlotMillis(
-      TaskAttemptImpl ta) {
+  private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
+      TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
     DAGEventCounterUpdate jce =
-        new DAGEventCounterUpdate(
-            ta.getDAGID()
-            );
+        new DAGEventCounterUpdate(taskAttempt.getDAGID());
 
-//    long slotMillis = computeSlotMillis(ta);
-//    jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillis);
-    return jce;
-  }
-
-  private static DAGEventCounterUpdate createJobCounterUpdateEventTATerminated(
-      TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
-      TaskAttemptStateInternal taState) {
-    DAGEventCounterUpdate jce =
-        new DAGEventCounterUpdate(
-            taskAttempt.getDAGID());
-
-    if (taState == TaskAttemptStateInternal.FAILED) {
+    if (taState == TaskAttemptState.FAILED) {
       jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
-    } else if (taState == TaskAttemptStateInternal.KILLED) {
+    } else if (taState == TaskAttemptState.KILLED) {
       jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
+    } else if (taState == TaskAttemptState.SUCCEEDED ) {
+      jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
     }
 
-//    long slotMillisIncrement = computeSlotMillis(taskAttempt);
-//    if (!taskAlreadyCompleted) {
-//      // dont double count the elapsed time
-//      jce.addCounterUpdate(DAGCounter.SLOTS_MILLIS_TASKS, slotMillisIncrement);
-//    }
-
     return jce;
   }
 
@@ -1142,8 +1126,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
       }
 
-      ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
-          helper.getTaskAttemptStateInternal()));
+      ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
+          helper.getTaskAttemptState()));
       if (ta.getLaunchTime() != 0) {
         // TODO For cases like this, recovery goes for a toss, since the the
         // attempt will not exist in the history file.
@@ -1184,7 +1168,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
       ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
       ta.httpPort = nodeHttpInetAddr.getPort();
-      ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
+      ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));
 
       LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
           + " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
@@ -1316,7 +1300,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.setFinishTime();
       // Send out history event.
       ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
-      ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
+      ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
+          TaskAttemptState.SUCCEEDED));
 
       // Inform the Scheduler.
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
@@ -1415,6 +1400,8 @@ public class TaskAttemptImpl implements TaskAttempt,
           // TODO abort taskattempt
           taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
               TaskEventType.T_ATTEMPT_KILLED));
+          taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
+              getExternalState(TaskAttemptStateInternal.KILLED)));
           endState = TaskAttemptStateInternal.KILLED;
           break;
         case SUCCEEDED:

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index c60ef58..9323270 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -45,7 +44,7 @@ public class TaskFinishedEvent implements HistoryEvent {
   private TezCounters tezCounters;
   private TezTaskAttemptID successfulAttemptID;
   private String diagnostics;
-  
+
   public TaskFinishedEvent(TezTaskID taskID,
       String vertexName, long startTime, long finishTime,
       TezTaskAttemptID successfulAttemptID,
@@ -85,9 +84,6 @@ public class TaskFinishedEvent implements HistoryEvent {
     if (diagnostics != null) {
       builder.setDiagnostics(diagnostics);
     }
-    if (tezCounters != null) {
-      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
-    }
     if (successfulAttemptID != null) {
       builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
     }
@@ -101,10 +97,6 @@ public class TaskFinishedEvent implements HistoryEvent {
     if (proto.hasDiagnostics()) {
       this.diagnostics = proto.getDiagnostics();
     }
-    if (proto.hasCounters()) {
-      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-          proto.getCounters());
-    }
     if (proto.hasSuccessfulTaskAttemptId()) {
       this.successfulAttemptID =
           TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index df3551a..8057714 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -99,9 +98,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     if (diagnostics != null) {
       builder.setDiagnostics(diagnostics);
     }
-    if (tezCounters != null) {
-      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
-    }
     return builder.build();
   }
 
@@ -113,10 +109,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     if (proto.hasDiagnostics())  {
       this.diagnostics = proto.getDiagnostics();
     }
-    if (proto.hasCounters()) {
-      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-          proto.getCounters());
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5cbe540..821612a 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -136,7 +136,6 @@ message VertexFinishedProto {
   optional int64 finish_time = 3;
   optional int32 state = 4;
   optional string diagnostics = 5;
-  optional TezCountersProto counters = 6;
 }
 
 message TaskStartedProto {
@@ -150,8 +149,7 @@ message TaskFinishedProto {
   optional int64 finish_time = 2;
   optional int32 state = 3;
   optional string diagnostics = 4;
-  optional TezCountersProto counters = 5;
-  optional string successful_task_attempt_id = 6;
+  optional string successful_task_attempt_id = 5;
 }
 
 message TaskAttemptStartedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3633a85..8e134f2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
@@ -423,6 +424,9 @@ public class TestTaskAttempt {
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
         "Terminated"));
@@ -571,6 +575,9 @@ public class TestTaskAttempt {
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
         "Terminated"));
@@ -658,6 +665,9 @@ public class TestTaskAttempt {
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEventsAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM));
@@ -744,6 +754,9 @@ public class TestTaskAttempt {
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
     taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
@@ -838,6 +851,9 @@ public class TestTaskAttempt {
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
     taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 3b04cf6..a443a35 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -20,16 +20,17 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttemptRecovery {
@@ -98,6 +101,17 @@ public class TestTaskAttemptRecovery {
     assertEquals(state, recoveredState);
   }
 
+  private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
+      int expectedTimes) {
+    int actualTimes = 0;
+    for (Event event : events) {
+      if (eventClass.isInstance(event)) {
+        actualTimes ++;
+      }
+    }
+    assertEquals(expectedTimes, actualTimes);
+  }
+
   /**
    * No any event to restore -> RecoverTransition
    */
@@ -105,7 +119,14 @@ public class TestTaskAttemptRecovery {
   public void testTARecovery_NEW() {
     ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
     assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-    verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    assertEquals(2, events.size());
+    verifyEvents(events, TaskEventTAUpdate.class, 1);
+    // one for task killed
+    verifyEvents(events, DAGEventCounterUpdate.class, 1);
   }
 
   /**
@@ -117,7 +138,14 @@ public class TestTaskAttemptRecovery {
 
     ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
     assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-    verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(3)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    assertEquals(3, events.size());
+    verifyEvents(events, TaskEventTAUpdate.class, 1);
+    // one for task launch, one for task killed
+    verifyEvents(events, DAGEventCounterUpdate.class, 2);
   }
 
   /**
@@ -131,7 +159,13 @@ public class TestTaskAttemptRecovery {
 
     ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
     assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
-    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    assertEquals(2, events.size());
+    // one for task launch, one for task succeeded
+    verifyEvents(events, DAGEventCounterUpdate.class, 2);
   }
 
   /**
@@ -145,7 +179,13 @@ public class TestTaskAttemptRecovery {
 
     ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
     assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    assertEquals(2, events.size());
+    // one for task launch, one for task killed
+    verifyEvents(events, DAGEventCounterUpdate.class, 2);
   }
 
   /**
@@ -159,7 +199,13 @@ public class TestTaskAttemptRecovery {
 
     ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
     assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
-    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    assertEquals(2, events.size());
+    // one for task launch, one for task killed
+    verifyEvents(events, DAGEventCounterUpdate.class, 2);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 72d0c25..5bb7d35 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -354,7 +354,6 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getFinishTime());
       Assert.assertEquals(event.getState(), deserializedEvent.getState());
       Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
-      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
       logEvents(event, deserializedEvent);
     }
     {
@@ -370,7 +369,6 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getFinishTime());
       Assert.assertEquals(event.getState(), deserializedEvent.getState());
       Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
-      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
       logEvents(event, deserializedEvent);
     }
   }
@@ -403,8 +401,6 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getFinishTime());
       Assert.assertEquals(event.getState(),
           deserializedEvent.getState());
-      Assert.assertEquals(event.getTezCounters(),
-          deserializedEvent.getTezCounters());
       Assert.assertEquals(event.getSuccessfulAttemptID(),
           deserializedEvent.getSuccessfulAttemptID());
       Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
@@ -425,8 +421,6 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getFinishTime());
       Assert.assertEquals(event.getState(),
           deserializedEvent.getState());
-      Assert.assertEquals(event.getTezCounters(),
-          deserializedEvent.getTezCounters());
       Assert.assertEquals(event.getSuccessfulAttemptID(),
           deserializedEvent.getSuccessfulAttemptID());
       Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());

http://git-wip-us.apache.org/repos/asf/tez/blob/16a0f579/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index eae38f0..bd2fe99 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -38,6 +39,8 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -193,7 +196,12 @@ public class TestAMRecovery {
     DAG dag =
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.BROADCAST, true);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
+
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
 
@@ -220,7 +228,11 @@ public class TestAMRecovery {
     DAG dag =
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.BROADCAST, false);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -244,11 +256,15 @@ public class TestAMRecovery {
    * @throws Exception
    */
   @Test(timeout = 120000)
-  public void testVertexPartialComplete_One2One() throws Exception {
+  public void testVertexPartialFinished_One2One() throws Exception {
     DAG dag =
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, true);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -273,11 +289,15 @@ public class TestAMRecovery {
    * @throws Exception
    */
   @Test(timeout = 120000)
-  public void testVertexCompletelyComplete_One2One() throws Exception {
+  public void testVertexCompletelyFinished_One2One() throws Exception {
     DAG dag =
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, false);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -306,7 +326,11 @@ public class TestAMRecovery {
     DAG dag =
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, true);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -335,7 +359,11 @@ public class TestAMRecovery {
     DAG dag =
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, false);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
+    assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
+    assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
+    assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
@@ -369,13 +397,14 @@ public class TestAMRecovery {
 
   }
 
-  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+  TezCounters runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus =
         dagClient.waitForCompletionWithStatusUpdates(EnumSet
             .of(StatusGetOpts.GET_COUNTERS));
     Assert.assertEquals(finalState, dagStatus.getState());
+    return dagStatus.getDAGCounters();
   }
 
   /**
@@ -479,6 +508,19 @@ public class TestAMRecovery {
         }
       }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      // sleep for 1 seconds to delay the running of task in v2.
+      // this could keep the case that task of v1 is partial finished or completely
+      // finished, and at the same time the task of v2 is not started
+      try {
+        Thread.sleep(1*1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      super.onVertexStarted(completions);
+    }
   }
 
   public static class ControlledShuffleVertexManager extends
@@ -516,6 +558,19 @@ public class TestAMRecovery {
         }
       }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      // sleep for 1 seconds to delay the running of task in v2.
+      // this could keep the case that task of v1 is partial finished or completely
+      // finished, and at the same time the task of v2 is not started
+      try {
+        Thread.sleep(1*1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      super.onVertexStarted(completions);
+    }
   }
 
   public static class ControlledImmediateStartVertexManager extends
@@ -554,6 +609,19 @@ public class TestAMRecovery {
         }
       }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      // sleep for 1 seconds to delay the running of task in v2.
+      // this could keep the case that task of v1 is partial finished or completely
+      // finished, and at the same time the task of v2 is not started
+      try {
+        Thread.sleep(1*1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      super.onVertexStarted(completions);
+    }
   }
 
   /**
@@ -592,6 +660,10 @@ public class TestAMRecovery {
     }
   }
 
+  public static enum TestCounter {
+    Counter_1,
+  }
+
   /**
    * Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
    * us to kill AM in VM when some tasks are still running.
@@ -605,8 +677,10 @@ public class TestAMRecovery {
 
     @Override
     public void run() throws Exception {
+      getContext().getCounters().findCounter(TestCounter.Counter_1).increment(1);
       if (getContext().getTaskIndex() == 0) {
-        return;
+        // keep task_0 running for 1 seconds to wait for task_1 start running
+        Thread.sleep(1 * 1000);;
       } else {
         Thread.sleep(3 * 1000);
       }